You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "josh-fell (via GitHub)" <gi...@apache.org> on 2023/02/16 01:52:41 UTC

[GitHub] [airflow] josh-fell commented on a diff in pull request #29548: Syedahsn/ec2 create terminate operators

josh-fell commented on code in PR #29548:
URL: https://github.com/apache/airflow/pull/29548#discussion_r1107924100


##########
airflow/providers/amazon/aws/operators/ec2.py:
##########
@@ -116,3 +116,139 @@ def execute(self, context: Context):
             target_state="stopped",
             check_interval=self.check_interval,
         )
+
+
+class EC2CreateInstanceOperator(BaseOperator):
+    """
+    Create and start an EC2 Instance using boto3
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EC2CreateInstanceOperator`
+
+    :param image_id: ID of the AMI used to create the instance.
+    :param max_count: Maximum number of instances to launch. Defaults to 1.
+    :param min_count: Minimum number of instances to launch. Defaults to 1.
+    :param aws_conn_id: AWS connection to use
+    :param region_name: AWS region name associated with the client.
+    :param poll_interval: Number of seconds to wait before attempting to
+        check state of instance. Only used if wait_for_completion is True. Default is 20.
+    :param max_attempts: Maximum number of attempts when checking state of instance.
+        Only used if wait_for_completion is True. Default is 20.
+    :param config: Dictionary for arbitrary parameters to the boto3 run_instances call.
+    :param wait_for_completion: If True, the operator will wait for the instance to be
+        in the `running` state before returning.
+    """
+
+    template_fields: Sequence[str] = (
+        "image_id",
+        "max_count",
+        "min_count",
+        "aws_conn_id",
+        "region_name",
+        "config",
+        "wait_for_completion",
+    )
+
+    def __init__(
+        self,
+        image_id: str,
+        max_count: int = 1,
+        min_count: int = 1,
+        aws_conn_id: str = "aws_default",
+        region_name: str | None = None,
+        poll_interval: int = 20,
+        max_attempts: int = 20,
+        config: dict | None = None,
+        wait_for_completion: bool = False,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.image_id = image_id
+        self.max_count = max_count
+        self.min_count = min_count
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+        self.poll_interval = poll_interval
+        self.max_attempts = max_attempts
+        self.config = config or {}
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context):
+        ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type")
+        instances = ec2_hook.conn.run_instances(
+            ImageId=self.image_id,
+            MinCount=self.min_count,
+            MaxCount=self.max_count,
+            **self.config,
+        )["Instances"]
+        instance_ids = []
+        for instance in instances:
+            instance_ids.append(instance["InstanceId"])
+            self.log.info("Created EC2 instance %s", instance["InstanceId"])
+
+            if self.wait_for_completion:
+                ec2_hook.get_waiter("instance_running").wait(
+                    InstanceIds=[instance["InstanceId"]],
+                    WaiterConfig={
+                        "Delay": self.poll_interval,
+                        "MaxAttempts": self.max_attempts,
+                    },
+                )
+
+        return instance_ids
+
+
+class EC2TerminateInstanceOperator(BaseOperator):
+    """
+    Terminate an EC2 Instance using boto3
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EC2TerminateInstanceOperator`
+
+    :instance_id: ID of the instance to be terminated.
+    :param aws_conn_id: AWS connection to use
+    :param region_name: AWS region name associated with the client.
+    :param poll_interval: Number of seconds to wait before attempting to
+        check state of instance. Only used if wait_for_completion is True. Default is 20.
+    :param max_attempts: Maximum number of attempts when checking state of instance.
+        Only used if wait_for_completion is True. Default is 20.
+    :param wait_for_completion: If True, the operator will wait for the instance to be
+        in the `terminated` state before returning.
+    """
+
+    template_fields: Sequence[str] = ("instance_id", "region_name", "aws_conn_id", "wait_for_completion")
+
+    def __init__(
+        self,
+        instance_id: str | list[str],
+        aws_conn_id: str = "aws_default",
+        region_name: str | None = None,
+        poll_interval: int = 20,
+        max_attempts: int = 20,
+        wait_for_completion: bool = False,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.instance_ids = [*instance_id]

Review Comment:
   This would unpack a single `instance_id` into each character individually. Probably should check the type of `instance_id` first (i.e. `str` or `list`) and normalize to a list of `instance_id`s.
   ```suggestion
           self.instance_ids = [instance_id] if isinstance(instance_id, str) else instance_id
   ```
   



##########
airflow/providers/amazon/aws/operators/ec2.py:
##########
@@ -116,3 +116,139 @@ def execute(self, context: Context):
             target_state="stopped",
             check_interval=self.check_interval,
         )
+
+
+class EC2CreateInstanceOperator(BaseOperator):
+    """
+    Create and start an EC2 Instance using boto3
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EC2CreateInstanceOperator`
+
+    :param image_id: ID of the AMI used to create the instance.
+    :param max_count: Maximum number of instances to launch. Defaults to 1.
+    :param min_count: Minimum number of instances to launch. Defaults to 1.
+    :param aws_conn_id: AWS connection to use
+    :param region_name: AWS region name associated with the client.
+    :param poll_interval: Number of seconds to wait before attempting to
+        check state of instance. Only used if wait_for_completion is True. Default is 20.
+    :param max_attempts: Maximum number of attempts when checking state of instance.
+        Only used if wait_for_completion is True. Default is 20.
+    :param config: Dictionary for arbitrary parameters to the boto3 run_instances call.
+    :param wait_for_completion: If True, the operator will wait for the instance to be
+        in the `running` state before returning.
+    """
+
+    template_fields: Sequence[str] = (
+        "image_id",
+        "max_count",
+        "min_count",
+        "aws_conn_id",
+        "region_name",
+        "config",
+        "wait_for_completion",
+    )
+
+    def __init__(
+        self,
+        image_id: str,
+        max_count: int = 1,
+        min_count: int = 1,
+        aws_conn_id: str = "aws_default",
+        region_name: str | None = None,
+        poll_interval: int = 20,
+        max_attempts: int = 20,
+        config: dict | None = None,
+        wait_for_completion: bool = False,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.image_id = image_id
+        self.max_count = max_count
+        self.min_count = min_count
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+        self.poll_interval = poll_interval
+        self.max_attempts = max_attempts
+        self.config = config or {}
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context):
+        ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type")
+        instances = ec2_hook.conn.run_instances(
+            ImageId=self.image_id,
+            MinCount=self.min_count,
+            MaxCount=self.max_count,
+            **self.config,
+        )["Instances"]
+        instance_ids = []
+        for instance in instances:
+            instance_ids.append(instance["InstanceId"])
+            self.log.info("Created EC2 instance %s", instance["InstanceId"])
+
+            if self.wait_for_completion:
+                ec2_hook.get_waiter("instance_running").wait(
+                    InstanceIds=[instance["InstanceId"]],
+                    WaiterConfig={
+                        "Delay": self.poll_interval,
+                        "MaxAttempts": self.max_attempts,
+                    },
+                )
+
+        return instance_ids
+
+
+class EC2TerminateInstanceOperator(BaseOperator):
+    """
+    Terminate an EC2 Instance using boto3

Review Comment:
   Same, related comment here too. This operator _could_ terminate multiple instances. I think users might like to know that upfront if they read the Python API docs instead of having to read the source code.



##########
airflow/providers/amazon/aws/operators/ec2.py:
##########
@@ -116,3 +116,139 @@ def execute(self, context: Context):
             target_state="stopped",
             check_interval=self.check_interval,
         )
+
+
+class EC2CreateInstanceOperator(BaseOperator):
+    """
+    Create and start an EC2 Instance using boto3

Review Comment:
   Technically this could start multiple instances based on the `max_` and `min_count` args. Let's update this description to account for that behavior too.



##########
airflow/providers/amazon/aws/operators/ec2.py:
##########
@@ -116,3 +116,139 @@ def execute(self, context: Context):
             target_state="stopped",
             check_interval=self.check_interval,
         )
+
+
+class EC2CreateInstanceOperator(BaseOperator):
+    """
+    Create and start an EC2 Instance using boto3
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EC2CreateInstanceOperator`
+
+    :param image_id: ID of the AMI used to create the instance.
+    :param max_count: Maximum number of instances to launch. Defaults to 1.
+    :param min_count: Minimum number of instances to launch. Defaults to 1.
+    :param aws_conn_id: AWS connection to use
+    :param region_name: AWS region name associated with the client.
+    :param poll_interval: Number of seconds to wait before attempting to
+        check state of instance. Only used if wait_for_completion is True. Default is 20.
+    :param max_attempts: Maximum number of attempts when checking state of instance.
+        Only used if wait_for_completion is True. Default is 20.
+    :param config: Dictionary for arbitrary parameters to the boto3 run_instances call.
+    :param wait_for_completion: If True, the operator will wait for the instance to be
+        in the `running` state before returning.
+    """
+
+    template_fields: Sequence[str] = (
+        "image_id",
+        "max_count",
+        "min_count",
+        "aws_conn_id",
+        "region_name",
+        "config",
+        "wait_for_completion",

Review Comment:
   Curious to know what a use case would be to have `wait_for_completion` as a template field? What did you have in mind here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org