You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "1inuxoid (via GitHub)" <gi...@apache.org> on 2023/02/09 22:14:20 UTC

[GitHub] [airflow] 1inuxoid opened a new pull request, #29452: Add support of a different AWS connection for DynamoDB

1inuxoid opened a new pull request, #29452:
URL: https://github.com/apache/airflow/pull/29452

   This change adds a new optional argument `dynamodb_conn_id` to `DynamoDBToS3Operator` so that a separate AWS connection can be used to scan a DynamoDB table. If not specified, the connection from `aws_conn_id` is used, as before.
   
   This makes it useful for cross-account transfers.
   
   closes: #29422 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1114113640


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   Thanks @Taragolis , I added a few more test for most important (all?) remaining combinations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1126144359


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   `None` is a valid value for `aws_conn_id` so you can't use for resolve between deprecated and current value `None`, you shoul use NOTSET, or other sentinels.
   
   And main thing you need to resolve before pass any values to upstream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1102453228


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -103,6 +106,7 @@ def __init__(
         self,
         *,
         dynamodb_table_name: str,
+        dynamodb_conn_id: str | None = None,

Review Comment:
   Thanks for this feedback, @Taragolis, it's very much appreciated as I'm quite new to the Airflow codebase. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109804748


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id

Review Comment:
   I personally feel that if you're getting a warning about deprecation when you use both, it should be helpful enough and we do want people to migrate away from `aws_conn_id` eventually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1462886157

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109807591


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -32,11 +33,17 @@
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
+_DEPRECATION_MSG = (
+    "The aws_conn_id parameter has been deprecated. You should pass instead the source_aws_conn_id parameter."

Review Comment:
   I think as we are not dropping it and the new code matches the behaviour of the old one if you only use `aws_conn_id`, just adding a deprecation warning, we are not twisting anyones hands too much. 
   On the other side, keeping all 3 connection parameters while only using maximum 2 out of them is not very clean, IMO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109372737


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   Because this neither implement `__bool__` or `__len__` it return True.
   And if `dest_aws_conn_id` is `None` than it replaced by `self.source_aws_conn_id`, however
   
   https://github.com/apache/airflow/blob/17e8bb7f9e320c97fd737f8786a6b16515f4810e/airflow/providers/amazon/aws/hooks/base_aws.py#L379-L388



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   Because this sentinel neither implement `__bool__` or `__len__` it return True.
   And if `dest_aws_conn_id` is `None` than it replaced by `self.source_aws_conn_id`, however
   
   https://github.com/apache/airflow/blob/17e8bb7f9e320c97fd737f8786a6b16515f4810e/airflow/providers/amazon/aws/hooks/base_aws.py#L379-L388



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1126133221


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   @Taragolis I wanted to make it clear that I'm blocked here 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426490755

   I agree I think it's best to leave aws conn Id alone and simply add an optional s3 conn Id for optional diff creds for bucket.
   This is fully backward compatible and intuitive behavior 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1424905565

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1130954044


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   Hey @Taragolis, thank you for your suggestions, I happily included them. Sorry for not noticing test failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1131381484


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -78,16 +89,20 @@ class DynamoDBToS3Operator(BaseOperator):
         :ref:`howto/transfer:DynamoDBToS3Operator`
 
     :param dynamodb_table_name: Dynamodb table to replicate data from
+    :param source_aws_conn_id: The Airflow connection used for AWS credentials
+        to access DynamoDB. If this is None or empty then the default boto3
+        behaviour is used. If running Airflow in a distributed manner and
+        source_aws_conn_id is None or empty, then default boto3 configuration
+        would be used (and must be maintained on each worker node).
     :param s3_bucket_name: S3 bucket to replicate data to
     :param file_size: Flush file to s3 if file size >= file_size
     :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json
-    :param aws_conn_id: The Airflow connection used for AWS credentials.
-        If this is None or empty then the default boto3 behaviour is used. If
-        running Airflow in a distributed manner and aws_conn_id is None or
-        empty, then default boto3 configuration would be used (and must be
-        maintained on each worker node).
+    :param dest_aws_conn_id: The Airflow connection used for AWS credentials
+        to access S3. If this is not set then the source_aws_conn_id connection is used.
+    :param aws_conn_id: The Airflow connection used for AWS credentials (deprecated).

Review Comment:
   ```suggestion
       :param aws_conn_id: The Airflow connection used for AWS credentials (deprecated; use source_aws_conn_id).
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -78,16 +89,20 @@ class DynamoDBToS3Operator(BaseOperator):
         :ref:`howto/transfer:DynamoDBToS3Operator`
 
     :param dynamodb_table_name: Dynamodb table to replicate data from
+    :param source_aws_conn_id: The Airflow connection used for AWS credentials
+        to access DynamoDB. If this is None or empty then the default boto3
+        behaviour is used. If running Airflow in a distributed manner and
+        source_aws_conn_id is None or empty, then default boto3 configuration
+        would be used (and must be maintained on each worker node).
     :param s3_bucket_name: S3 bucket to replicate data to
     :param file_size: Flush file to s3 if file size >= file_size
     :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json
-    :param aws_conn_id: The Airflow connection used for AWS credentials.
-        If this is None or empty then the default boto3 behaviour is used. If
-        running Airflow in a distributed manner and aws_conn_id is None or
-        empty, then default boto3 configuration would be used (and must be
-        maintained on each worker node).
+    :param dest_aws_conn_id: The Airflow connection used for AWS credentials
+        to access S3. If this is not set then the source_aws_conn_id connection is used.
+    :param aws_conn_id: The Airflow connection used for AWS credentials (deprecated).
+
     """  # noqa: E501
 
     template_fields: Sequence[str] = (

Review Comment:
   We may as well add them to the template fields while we're at it.
   
   ```suggestion
       template_fields: Sequence[str] = (
           "source_aws_conn_id",
           "dest_aws_conn_id",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1433496214

   > @eladkal @ferruzzi please have a look at the changes at your convenience.
   
   I'll take a look today, but it looks like Taragolis did a pretty solid review already. :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1102125057


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -103,6 +106,7 @@ def __init__(
         self,
         *,
         dynamodb_table_name: str,
+        dynamodb_conn_id: str | None = None,

Review Comment:
   It is more about less known behaviour of boto3-based Hooks, `None` it is a legit value and could force to use boto3 default behaviour strategy (without any call to Connections), e.g. env Variables, IAM Profile, ECS Task Role and etc.
   
   I think better to use specific sentinel as default value instead of `None` here
   
   https://github.com/apache/airflow/blob/a76e0fe16ef12749c3fea1b68d82936b238fafbb/airflow/utils/types.py#L28-L44



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109812718


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -103,12 +114,14 @@ def __init__(
         self,
         *,
         dynamodb_table_name: str,
+        source_aws_conn_id: str = "aws_default",

Review Comment:
   On one side I agree that this typing would make more sense, however, the old implementation had `aws_conn_id` typed as `str` so people would just ignore they typing error messages when they would use `None` as a value? Contrary to the documentation, it only supported empty string before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1428636726

   > I am curious to hear what kind of percentage of users have cross-account workflows like this.
   
   I think this should be rather typical use case. Many companies use multiple accounts to isolate services and simplify cost attribution.
   
   Another typical use case is data lakes built on top of S3, which live separately from all the micro-services that use DynamoDB.
   
   We don't have a single intra-account transfer for this operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1130770356


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   Correct, to be able to remove type casting, which wasn't correct (it would produce string from a None value).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1130767727


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   I guess this is for avoid `mypy` complaining in static checks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1108381943


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -103,12 +114,14 @@ def __init__(
         self,
         *,
         dynamodb_table_name: str,
+        source_aws_conn_id: str = "aws_default",
         s3_bucket_name: str,
         file_size: int,
         dynamodb_scan_kwargs: dict[str, Any] | None = None,
         s3_key_prefix: str = "",
         process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes,
-        aws_conn_id: str = "aws_default",
+        dest_aws_conn_id: ArgNotSet | str = NOTSET,
+        aws_conn_id: ArgNotSet | str = NOTSET,

Review Comment:
   ```suggestion
           dest_aws_conn_id: str | None | ArgNotSet  = NOTSET,
           aws_conn_id: str | None | ArgNotSet = NOTSET,
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   "Tests Police" here 🤣 , we need also test this one. Just sure that we cover all cases with different inputs of `aws_conn_id`, `source_aws_conn_id`, `dest_aws_conn_id`, especially (but not limited) cases which is possible for current version of provider:
   1. User not provide anything, for example user expected to use `aws_default` connection 🙄 
   2. User provide value only to `aws_conn_id`
   
   Might be something like this:
   
   https://github.com/apache/airflow/blob/16fddbae83d03c9b3e2d249cc8852fb006c65c3b/tests/providers/slack/hooks/test_slack_webhook.py#L194-L211



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -103,12 +114,14 @@ def __init__(
         self,
         *,
         dynamodb_table_name: str,
+        source_aws_conn_id: str = "aws_default",

Review Comment:
   ```suggestion
           source_aws_conn_id: str | None = "aws_default",
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   Why we need string representation of `self.dest_aws_conn_id` here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis merged pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis merged PR #29452:
URL: https://github.com/apache/airflow/pull/29452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1102515113


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -103,6 +106,7 @@ def __init__(
         self,
         *,
         dynamodb_table_name: str,
+        dynamodb_conn_id: str | None = None,

Review Comment:
   Well this behaviour not well documented - only in docstrings mentioned that `aws_conn_id` (which propagated to AwsBaseHook) could apply `None` which internally fallback to simple `boto3.session.Session(region_name=region_name)`. We add some info about other very odd historical behaviour in  documentation for connection (see main brach [docs](http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow-providers-amazon/latest/connections/aws.html#default-connection-ids)) which also mention about `None` behaviour.
   
   In Amazon Provider still a lot of inconsistent things between components, especially in Operators and hooks which designed in Airflow 1.x, however every release situation become better and better.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1428732998

   You're right, there is `dest_aws_conn_id` in [GCSToS3Operator](https://github.com/apache/airflow/blob/0f98bdda6a96cdd140ada380c3a10ef722bb19b2/airflow/providers/amazon/aws/transfers/gcs_to_s3.py#L105), so I'll adhere to that pattern. 
   
   Do we deprecate `aws_conn_id`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426514058

   > What would be the purpose of such generalisation? Out of all transfer operators only three of them operate entirely within AWS: DynamoDBToS3Operator RedshiftToS3Operator and S3ToRedshiftOperator. Another 15 have just one AWS connection, which is specified with aws_conn_id.
   
   >We could even go as far as generalising all transfer operators, as there is always an in and an out with them, but that sounds like a major revamp. What does everyone think? How can we make the most practical first step?
   
   
   But why? My initial thought was to generalize only the Aws to Aws transfer operators.
   There is no need to handle the non aws ones because there it's very clear who is the input conn and who is the output coon
   `HiveToDynamoDBOperator` - has `hiveserver2_conn_id`, `aws_conn_id` you know what is in/out without adding them to the parameter names. The issue we are discussing is relevant only for Aws to Aws transfers where it's not clear.
   
   We can have `source_aws_conn_id` and `dest_aws_conn_id` where the dest is default with the source value. Only when user override the default it will be the case of 2 distinct accounts.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426099946

   I would like to suggest that you add "s3_conn_id" instead of "dynamodb_conn_id".    Otherwise this end up with quite unintuitive behavior IMO.
   
   Because when user sees "dynamodb_conn_id" they will likely assume that's the one they should use.  So then I expect that commonly a user will inadvertently do transfer between dynamodb, connection ID and AWS connection ID.
   
   But if, on the other hand, you add S3 con ID, then user can leave that None generally speaking, and the AWS conn_id will always be the one thats used for Dynamo db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426412030

   > > `aws_conn_id_in`, `aws_conn_id_out` ??? and deprecate `aws_conn_id` for transfers operators within AWS?
   > 
   > I'd maybe leave the existing aws_conn_id and make the new way an option with some checks to assert `aws_conn_id XOR (aws_conn_id_in and aws_conn_id_out)`. I don't think adding the option to specify an _out should require folks to update their existing code.
   
   I share the consideration of current usage of the operator and I agree that we should definitely keep the people, who are happy with using just one AWS connection untouched, however, the idea of introducing two more connection parameters so that there would never be a case, when all connections will be used at the same time, sounds quite weird to me.
   
   What would be the purpose of such generalisation? Out of all transfer operators only three of them operate entirely within AWS: `DynamoDBToS3Operator` `RedshiftToS3Operator` and `S3ToRedshiftOperator`. Another 15 have just one AWS connection, which is specified with `aws_conn_id`.
   
   We could even go as far as generalising all transfer operators, as there is always an **in** and an **out** with them, but that sounds like a major revamp. What does everyone think? How can we make the most practical first step?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109842430


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   @Taragolis I need your advise here please. 
   None of the other AWS test suites seems to do any assertions on the warning message about deprecation (i.e. `parameter has been deprecated`) (3 other operators are using it: `image_attachment_to_s3`, `s3_to_sftp`, `mongo_to_s3`),
   and my code doesn't raise an Exception.
   
   Do you still think it's necessary to assert that the warning is present? If so, why wasn't it done this way in any of those other test suites?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1110319799


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   My mistake.  I wonder if we should some day add a __bool__ to it so we can use it that way.  But that's not really relevant now and I suspect there would be a bit of discussion if NOTSET should be truthy or falsy.  `if NOTSET` should resolve to true, I guess, but having it falsy makes those assignments like the one I suggested really nice.
   
   Anyway.. My mistake, feel free to resolve this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1120375563


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   TBH, at this point I don't understand why do you suggest `NOTSET` sentinel as a default fallback for new connections instead of using the default fallback of `aws_default` like it is widely done throughout this provider? 😅 
   
   It would still support passing over the `None` value to `boto3`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426107050

   `aws_conn_id_in`, `aws_conn_id_out` ??? and deprecate `aws_conn_id` for transfers operators within AWS?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1428691255

   Sorry, @eladkal, I did't quite understand you, you suggest to check connection in other transfer operators for what exactly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109799460


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   Because of the signature of `_upload_file_to_s3` which expects aws_conn_id to be a string [here](https://github.com/apache/airflow/pull/29452/files/150b2a0407d066bbe15a764ada315718f88615fa#diff-5a4b3715577fad0a7bbf023f861a5f4f1e0fb1abb4b2a71ecb804a068d9de4edR62).
   
   As you can see, `aws_conn_id` was always defined as type `str`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109826014


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   To be fair, that wasn't a test for `aws_default` value before 😆 and so far only `test_gcs_to_s3.py` had such test case.
   But it's alright, I can add such test and the ones for all the other connection combinations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1110342885


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   `NOTSET` it is just a sentinel, for case when we can't use `None`. And since community providers are have min Airflow 2.3 support we could safety use it in all places, rather than implement it per provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1120345887


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   @Taragolis if I were to change the signature of `_upload_file_to_s3` to extend the type of `aws_conn_id` to `str | ArgNotSet | None` to match `dest_aws_conn_id`, I have a problem with S3Hook which only expects `str | None` and we're passing `dest_aws_conn_id` there.
   
   I tried this inside constructor, but that didn't help me:
   ```python
           if dest_aws_conn_id is NOTSET:
               self.dest_aws_conn_id = self.source_aws_conn_id
           elif isinstance(dest_aws_conn_id, str):
               self.dest_aws_conn_id = dest_aws_conn_id
           else:
               self.dest_aws_conn_id = None
   ```
   
   Strangely enough, ArgNotSet part of the type still escapes. Would you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1130848667


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +138,16 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:

Review Comment:
   ```suggestion
           if not isinstance(aws_conn_id, ArgNotSet):
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -52,9 +60,15 @@ def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
 
 
 def _upload_file_to_s3(
-    file_obj: IO, bucket_name: str, s3_key_prefix: str, aws_conn_id: str = "aws_default"
+    file_obj: IO,
+    bucket_name: str,
+    s3_key_prefix: str,
+    aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name,

Review Comment:
   ```suggestion
       aws_conn_id: str | None = AwsBaseHook.default_conn_name,
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -52,9 +60,15 @@ def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
 
 
 def _upload_file_to_s3(
-    file_obj: IO, bucket_name: str, s3_key_prefix: str, aws_conn_id: str = "aws_default"
+    file_obj: IO,
+    bucket_name: str,
+    s3_key_prefix: str,
+    aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name,
 ) -> None:
-    s3_client = S3Hook(aws_conn_id=aws_conn_id).get_conn()
+    if isinstance(aws_conn_id, str) or aws_conn_id is None:
+        s3_client = S3Hook(aws_conn_id=aws_conn_id).get_conn()
+    else:
+        s3_client = S3Hook().get_conn()

Review Comment:
   ```suggestion
       s3_client = S3Hook(aws_conn_id=aws_conn_id).get_conn()
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +138,16 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id

Review Comment:
   ```suggestion
           self.dest_aws_conn_id = (
               self.source_aws_conn_id if isinstance(dest_aws_conn_id, ArgNotSet) else dest_aws_conn_id
           )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426104193

   > I would like to suggest that you add "s3_conn_id" instead of "dynamodb_conn_id". Otherwise this end up with quite unintuitive behavior IMO.
   
   Wouldn't it be more confusing as we removed S3 conn type
   https://github.com/apache/airflow/pull/25980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426120246

   > `aws_conn_id_in`, `aws_conn_id_out` ??? and deprecate `aws_conn_id` for transfers operators within AWS?
   
   This ia pretty much what I had in mind for a base aws transfer class


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1428707323

   The paramter name. I'm pretty sure I already saw the dest_  prefix somewhere


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1428498889

   If I were to pick, I'd say `source_aws_conn_id`/ `destination_aws_conn_id`  or `aws_conn_id_in`/`aws_conn_id_out`.  I think I like the idea of keeping the ``aws_conn_id` part intact, with either a prefix or a suffix.
   
   I also still like the idea of having `aws_conn_id` be a valid parameter when they are both the same value, which keeps from breaking existing workflows and keeps consistency with other operators and sensors for folks who don't use cross-account workflows.
   
   I am curious to hear what kind of percentage of users have cross-account workflows like this.  I don't expect you to have an actual answer, just curious if this is a large portion of the userbase or an outlier or????


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1431992437

   @eladkal @ferruzzi please have a look at the changes at your convenience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109871569


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   Ohhh, You could change `_upload_file_to_s3` signature, no problem here at all this is private function (so you could do whatever you wanted and required), which for some reason initially in Airflow 1.x implemented outside of the Operator, and signature `str | None` expected for S3Hook.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1462741248

   I'm OK with it
   
   
   I still think that this problem is not localized to Dynamo.
   I think we should further explore the option to make it generic for other transfer operators.
   Something like
   
   ```
   class BaseAwsTransferOperator(BaseOperator):
   
       def __init__(
           self,
           *,
           source_aws_conn_id: str | None = AwsBaseHook.default_conn_name,
           dest_aws_conn_id: str | None | ArgNotSet = NOTSET,
           aws_conn_id: str | None | ArgNotSet = NOTSET,
           **kwargs,
       ) -> None:
           super().__init__(
               ...,
               **kwargs,
           )
   
   
   class DynamoDBToS3Operator(BaseAwsTransferOperator):
       ...
   
   
   class S3ToS3Operator(BaseAwsTransferOperator):
       ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109190072


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -78,16 +85,20 @@ class DynamoDBToS3Operator(BaseOperator):
         :ref:`howto/transfer:DynamoDBToS3Operator`
 
     :param dynamodb_table_name: Dynamodb table to replicate data from
+    :param source_aws_conn_id: The Airflow connection used for AWS credentials
+        to access DynamoDB. If this is None or empty then the default boto3
+        behaviour is used. If running Airflow in a distributed manner and
+        source_aws_conn_id is None or empty, then default boto3 configuration
+        would be used (and must be maintained on each worker node).
     :param s3_bucket_name: S3 bucket to replicate data to
     :param file_size: Flush file to s3 if file size >= file_size
     :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json
-    :param aws_conn_id: The Airflow connection used for AWS credentials.
-        If this is None or empty then the default boto3 behaviour is used. If
-        running Airflow in a distributed manner and aws_conn_id is None or
-        empty, then default boto3 configuration would be used (and must be
-        maintained on each worker node).
+    :param dest_aws_conn_id: The Airflow connection used for AWS credentials
+        to access S3. If this is no set then the source_aws_conn_id connection is used.

Review Comment:
   ```suggestion
       :param dest_aws_conn_id: The Airflow connection used for AWS credentials
           to access S3. If this is not set then the source_aws_conn_id connection is used.
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -32,11 +33,17 @@
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
+_DEPRECATION_MSG = (
+    "The aws_conn_id parameter has been deprecated. You should pass instead the source_aws_conn_id parameter."

Review Comment:
   ```suggestion
       "The aws_conn_id parameter has been deprecated. Use the source_aws_conn_id parameter instead."
   ```
   
   I'm still not convinced that dropping aws_conn_id is the right answer here and would prefer to see it left in, but if consensus is to drop it then I guess source_ is a suitable name.



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   You'd have to test it, but this may be easier as 
   
   ```suggestion
       self.dest_aws_conn_id = dest_aws_conn_id or self.source_aws_conn_id
   ```
   
   I'm pretty sure (but not positive) that NOTSET returns falsy



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id

Review Comment:
   Should there be an exception if `aws_conn_id` and `source_aws_conn_id` are both provided?  If not, I think it should be made pretty obvious which has priority.  Maybe the fact you are deprecating `aws_conn_id` is enough of a hint, but maybe not.  Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1428680356

   I suggest first to check other teansfer operators. I think we already have a precedence for that (we are not obligated to do the same but let's verify)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1427997115

   Alright, I feel like the is a consensus around having two in/out connections.
   I have two questions left :) 
   1. What about the argument of leaving `aws_conn_id` for people who only use one connection?
   2. How can we finalise the naming, it feels to me that `aws_source_conn_id` reads somewhat easier than `source_aws_conn_id`, but that's just my opinion 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426515000

   BTW this issue is not just aws specific.
   It's same case also for GCP to GCP transfers and Azure to Azure (or any other cloud provider)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426187395

   > `aws_conn_id_in`, `aws_conn_id_out` ??? and deprecate `aws_conn_id` for transfers operators within AWS?
   
   I'd maybe leave the existing aws_conn_id and make the new way an option with some checks to assert `aws_conn_id XOR (aws_conn_id_in and aws_conn_id_out)`.  I don't think adding the option to specify an _out should require folks to update their existing code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on PR #29452:
URL: https://github.com/apache/airflow/pull/29452#issuecomment-1426112129

   > Wouldn't it be more confusing as we removed S3 conn type
   > #25980
   
   I don't think so, for the same reason that there isn't a "dynamodb" conn type.  It's just "use this conn id for this service" -- all of them are AWS conn type.
   
   So, aws conn id would by default be used for both dynamo and s3.  But you can optionally override s3 by supplying it.  That's my thinking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1109862762


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,10 +131,18 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.aws_conn_id = aws_conn_id
+        if aws_conn_id is not NOTSET:
+            warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
+            self.source_aws_conn_id = aws_conn_id
+        else:
+            self.source_aws_conn_id = source_aws_conn_id
+        if dest_aws_conn_id is NOTSET:
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id

Review Comment:
   That just sample that test could be pretty dumb.
   And you could test that warning raised by [`pytest.warn`](https://docs.pytest.org/en/7.1.x/how-to/capture-warnings.html#warns) context manager it is almost the same as `pytest.raises` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] 1inuxoid commented on a diff in pull request #29452: Add support of a different AWS connection for DynamoDB

Posted by "1inuxoid (via GitHub)" <gi...@apache.org>.
1inuxoid commented on code in PR #29452:
URL: https://github.com/apache/airflow/pull/29452#discussion_r1130679194


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -135,7 +156,7 @@ def execute(self, context: Context) -> None:
                 raise e
             finally:
                 if err is None:
-                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.aws_conn_id)
+                    _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, str(self.dest_aws_conn_id))

Review Comment:
   Thanks @Taragolis, I solved it [this way](https://github.com/apache/airflow/pull/29452/files#diff-5a4b3715577fad0a7bbf023f861a5f4f1e0fb1abb4b2a71ecb804a068d9de4edR68-R71), I hope you will find it acceptable. :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org