You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/07 10:04:30 UTC

[GitHub] [airflow] pankajkoti opened a new pull request, #22808: Add example DAG for demonstrating usage of GCS sensors

pankajkoti opened a new pull request, #22808:
URL: https://github.com/apache/airflow/pull/22808

   Following GCS Sensor examples are provided as part of the change:
   1. GCSUploadSessionCompleteSensor
   2. GCSObjectUpdateSensor


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848099656


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Manual steps are unfortunately a big no for us since the plan is to run system tests in CI in community. We can't expect people performing manual task while running them so I'm looking for an automatic solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1092921127

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848059666


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Hi @bhirsz I wanted to do upload the file programatically like the other chain. However, the sensor task GCSUploadSessionCompleteSensor waits for change in number of files in the bucket. If we have the upload task before GCSUploadSessionCompleteSensor task, it won't detect any changes. On the other hand, if we add upload task after GCSUploadSessionCompleteSensor task, it would be blocked until GCSUploadSessionCompleteSensor task completes and does not solve the need. I am unsure how to add dependency between such tasks. Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1096205067

   Hi, 
   are you aware of [AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests)? We should now strive to use new design of the system tests. There is open PR migrating the same files you modified in this PR: #22778 . You can use it as a reference for the future. I will also migrate your changes - although I have some comments so I will also leave it in this PR (no action required, just general tips :) ).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1092867747

   @pankajkoti Static check is failing: https://github.com/apache/airflow/runs/5885329724?check_suite_focus=true#step:11:296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1092874164

   @kaxil yes, I love it too, thank you. I missed installing it earlier. Installed now and pushed the fix. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848106371


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Okay, I was not aware of the fact that we're migrating these example DAGs to tests. I was of the impression that the example DAGs are for references to our community on how to implement DAGs for certain operators. Understand and totally agree to your fair point that tests should not need manual interventions. 
   Okay, so we can try introducing sleep if we feel right about it. I am new to the contribution as well :) 
   
   cc: @kaxil 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1092868943

   > @pankajkoti Static check is failing: https://github.com/apache/airflow/runs/5885329724?check_suite_focus=true#step:11:296
   
   I recommend installing and using pre-commit hook so this will be automatically checked when you run "git commit" -> this is explained in https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#enabling-pre-commit-hooks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848047726


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Tests shouldn't depend on manual steps - if the file is required we should:
   
   1) store it in resources
   2) upload using operator, for example LocalFilesystemToGCSOperator
   3) After the tests, remove any resources created during tests (usually it's enough to remove Bucket)
   



##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Tests shouldn't depend on manual steps - if the file is required we should:
   
   1) store it in resources
   2) upload using operator, for example LocalFilesystemToGCSOperator
   3) After the tests, remove any resources created during tests (usually it's enough to remove Bucket)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848080946


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   It's tricky in this case, indeed. How about starting sensors in parallel with upload_file? 
   
   Example:
   ```
       chain(
           # TEST SETUP
           create_bucket,
           upload_file,
           # TEST BODY
           [gcs_object_exists, gcs_object_with_prefix_exists],
           # TEST TEARDOWN
           delete_bucket,
       )
       chain(
           create_bucket,
           # TEST BODY
           [gcs_upload_session_complete, gcs_update_object_exists],
           delete_bucket
       )
   ```
   
   ![image](https://user-images.githubusercontent.com/8532066/162905393-61247c98-9661-40b3-97db-9a2f2aeb660c.png)
   
   We're starting the sensors and in meantime we're uploading the file - and sensors detect it:
   ![image](https://user-images.githubusercontent.com/8532066/162905681-eab7e6a1-967b-43ac-9582-aac5e21dea16.png)
   
   ![image](https://user-images.githubusercontent.com/8532066/162905856-f8a093b6-85cf-46d3-b2f6-461e2dbbe51a.png)
   
   I'm not sure though if it will not be flaky in some cases - running this in CI will show. I will update my PR with this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848106371


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Okay, I was not aware of the fact that we're migrating these example DAGs to tests. I was of the impression that the example DAGs are for references to our community on how to implement DAGs for certain operators. Understand and totally agree to your fair point that tests should not need manual interventions. 
   Okay, so we can try introducing sleep if we feel right about it. Also, I am new to contributing to Airflow :) 
   
   cc: @kaxil 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848088855


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Right. Since, we are starting in parallel, it may happen that, the upload task is picked before the sensor task and it may not detect the change as expected :) 



##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Right. Since, we are starting in parallel, it may happen that, the upload task is picked up before the sensor task and it may not detect the change as expected :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848088855


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Right. Since, we are starting in parallel, it may happen that, the upload task is picked up before the sensor task begins and it may not detect the change as expected. Hence, added the comment to manually upload the file. :)
   
   Also, the gcs_object_update_sensor_task needs to be activated after gcs_upload_session_complete_task (and not in parallel with it) as the object is expected to be detected by sensor and the object_update task is to confirm the manually uploaded file exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1091475319

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1092921486

   Well done. Congratulations on your first merged PR πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848088855


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Right. Since, we are starting in parallel, it may happen that, the upload task is picked up before the sensor task begins and it may not detect the change as expected. Hence, added the comment to manually upload the file. :)
   
   Also, the gcs_object_update_sensor_task needs to be activated after gcs_upload_session_complete_task (and not in parallel with it) as the object is expected to be detected by sensor and the object_update task is to confirm the manually upload has happened prior to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1096205064

   Hi, 
   are you aware of [AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests)? We should now strive to use new design of the system tests. There is open PR migrating the same files you modified in this PR: #22778 . You can use it as a reference for the future. I will also migrate your changes - although I have some comments so I will also leave it in this PR (no action required, just general tips :) ).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r845013057


##########
airflow/example_dags/example_gcs.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example dag for demonstrating usage of GCS sensors.
+"""
+import os
+from datetime import timedelta
+
+import pendulum
+
+from airflow import DAG
+from airflow.providers.google.cloud.sensors.gcs import GCSUploadSessionCompleteSensor, GCSObjectUpdateSensor
+
+TEST_BUCKET = os.getenv("GCP_TEST_BUCKET", "test-gcs-bucket")
+GCP_CONN_ID = os.getenv("GCP_CONN_ID", "google_cloud_default")
+
+# Upload example_test.txt in the <TEST_BUCKET> after triggering the DAG.
+BUCKET_FILE_LOCATION = "example_test.txt"
+
+# This is the upload file name prefix the sensor will be waiting for.
+PATH_TO_UPLOAD_FILE_PREFIX = "example_"
+
+DEFAULT_ARGS = {
+    "execution_timeout": timedelta(minutes=30),
+}
+
+with DAG(
+    "example_gcs_sensors",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule_interval=None,
+    catchup=False,
+    default_args=DEFAULT_ARGS,
+    tags=["example", "gcs"],
+) as dag:

Review Comment:
   Since we only use it once



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848059666


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Hi @bhirsz I wanted to do upload the file programatically like the other chain. However, the sensor task GCSUploadSessionCompleteSensor waits for change in number of files in the bucket. If we have the task before GCSUploadSessionCompleteSensor task, it won't detect any changes. On the other hand, if we add upload task after GCSUploadSessionCompleteSensor task, it would be blocked until GCSUploadSessionCompleteSensor task completes and does not solve the need. I am unsure how to add dependency between such tasks. Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajkoti commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848088855


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Right. Since, we are starting in parallel, it may happen that, the upload task is picked up before the sensor task begins and it may not detect the change as expected. Hence, added the comment to manually upload the file. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848098678


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   We can alleviate it a bit with sleep (yes, the old the ugly sleep ;)). Run sensors separably and in the meantime trigger sleep (5s will suffice) and then upload task:
   
   
       chain(
           # TEST SETUP
           create_bucket,
           sleep,
           upload_file,
           # TEST BODY
           [gcs_object_exists, gcs_object_with_prefix_exists],
           # TEST TEARDOWN
           delete_bucket,
       )
       chain(
           create_bucket,
           # TEST BODY
           gcs_upload_session_complete,
           gcs_update_object_exists,
           delete_bucket
       )
   
   And of course put some explanation in comments why we're doing it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r845012678


##########
airflow/example_dags/example_gcs.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example dag for demonstrating usage of GCS sensors.
+"""
+import os
+from datetime import timedelta
+
+import pendulum
+
+from airflow import DAG
+from airflow.providers.google.cloud.sensors.gcs import GCSUploadSessionCompleteSensor, GCSObjectUpdateSensor
+
+TEST_BUCKET = os.getenv("GCP_TEST_BUCKET", "test-gcs-bucket")
+GCP_CONN_ID = os.getenv("GCP_CONN_ID", "google_cloud_default")
+
+# Upload example_test.txt in the <TEST_BUCKET> after triggering the DAG.
+BUCKET_FILE_LOCATION = "example_test.txt"
+
+# This is the upload file name prefix the sensor will be waiting for.
+PATH_TO_UPLOAD_FILE_PREFIX = "example_"
+
+DEFAULT_ARGS = {
+    "execution_timeout": timedelta(minutes=30),
+}
+
+with DAG(
+    "example_gcs_sensors",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule_interval=None,
+    catchup=False,
+    default_args=DEFAULT_ARGS,
+    tags=["example", "gcs"],
+) as dag:

Review Comment:
   ```suggestion
   with DAG(
       "example_gcs_sensors",
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       schedule_interval=None,
       catchup=False,
       default_args={
           "execution_timeout": timedelta(minutes=30),
       },
       tags=["example", "gcs"],
   ) as dag:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22808:
URL: https://github.com/apache/airflow/pull/22808#issuecomment-1092821902

   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848116867


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.

Review Comment:
   Yeah, we had two goals with the migration - to make writing system tests easier and actually ensure that are example dags are runnable (what would be point in example that doesn't work, and it's actually often the case ;)). In the old design system tests actually "wrapped" and run examples - but not all example dags were used by system tests. In the new design we're also using examples as system tests, but without separating it to different files and now example dag is the system test itself. 
   
   Any change is highly welcome and it's good to see PRs such like yours - congrats! We only missed the notification on PR (sadly Github doesn't allow for an advanced notification system) and we were not able to have this discussion before merging the PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r845016710


##########
airflow/example_dags/example_gcs.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example dag for demonstrating usage of GCS sensors.
+"""
+import os
+from datetime import timedelta
+
+import pendulum
+
+from airflow import DAG
+from airflow.providers.google.cloud.sensors.gcs import GCSUploadSessionCompleteSensor, GCSObjectUpdateSensor
+
+TEST_BUCKET = os.getenv("GCP_TEST_BUCKET", "test-gcs-bucket")
+GCP_CONN_ID = os.getenv("GCP_CONN_ID", "google_cloud_default")
+
+# Upload example_test.txt in the <TEST_BUCKET> after triggering the DAG.
+BUCKET_FILE_LOCATION = "example_test.txt"
+
+# This is the upload file name prefix the sensor will be waiting for.
+PATH_TO_UPLOAD_FILE_PREFIX = "example_"
+
+DEFAULT_ARGS = {
+    "execution_timeout": timedelta(minutes=30),
+}
+
+with DAG(
+    "example_gcs_sensors",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule_interval=None,
+    catchup=False,
+    default_args=DEFAULT_ARGS,
+    tags=["example", "gcs"],
+) as dag:
+
+    # [START howto_sensor_gcs_upload_session_complete_task]

Review Comment:
   Should we add this in https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/gcs.html ?
   
   You can add it in https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers-google/operators/cloud/gcs.rst which will show up in the page I linked above when new version of provider is released.
   
   You can also build docs locally as described in https://github.com/apache/airflow/blob/main/BREEZE.rst#building-the-documentation
   
   ```
   ./breeze build-docs -- --package-filter apache-airflow-providers-google
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil merged pull request #22808: Add example DAG for demonstrating usage of GCS sensors

Posted by GitBox <gi...@apache.org>.
kaxil merged PR #22808:
URL: https://github.com/apache/airflow/pull/22808


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org