You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "josh-fell (via GitHub)" <gi...@apache.org> on 2023/02/22 19:10:32 UTC

[GitHub] [airflow] josh-fell commented on a diff in pull request #29548: Add `EC2CreateInstanceOperator`, `EC2TerminateInstanceOperator`

josh-fell commented on code in PR #29548:
URL: https://github.com/apache/airflow/pull/29548#discussion_r1113095186


##########
airflow/providers/amazon/aws/operators/ec2.py:
##########
@@ -116,3 +116,141 @@ def execute(self, context: Context):
             target_state="stopped",
             check_interval=self.check_interval,
         )
+
+
+class EC2CreateInstanceOperator(BaseOperator):
+    """
+    Create and start a specified number of EC2 Instances using boto3
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EC2CreateInstanceOperator`
+
+    :param image_id: ID of the AMI used to create the instance.
+    :param max_count: Maximum number of instances to launch. Defaults to 1.
+    :param min_count: Minimum number of instances to launch. Defaults to 1.
+    :param aws_conn_id: AWS connection to use
+    :param region_name: AWS region name associated with the client.
+    :param poll_interval: Number of seconds to wait before attempting to
+        check state of instance. Only used if wait_for_completion is True. Default is 20.
+    :param max_attempts: Maximum number of attempts when checking state of instance.
+        Only used if wait_for_completion is True. Default is 20.
+    :param config: Dictionary for arbitrary parameters to the boto3 run_instances call.
+    :param wait_for_completion: If True, the operator will wait for the instance to be
+        in the `running` state before returning.
+    """
+
+    template_fields: Sequence[str] = (
+        "image_id",
+        "max_count",
+        "min_count",
+        "aws_conn_id",
+        "region_name",
+        "config",
+        "wait_for_completion",
+    )
+
+    def __init__(
+        self,
+        image_id: str,
+        max_count: int = 1,
+        min_count: int = 1,
+        aws_conn_id: str = "aws_default",
+        region_name: str | None = None,
+        poll_interval: int = 20,
+        max_attempts: int = 20,
+        config: dict | None = None,
+        wait_for_completion: bool = False,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.image_id = image_id
+        self.max_count = max_count
+        self.min_count = min_count
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+        self.poll_interval = poll_interval
+        self.max_attempts = max_attempts
+        self.config = config or {}
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context):
+        ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type")
+        instances = ec2_hook.conn.run_instances(
+            ImageId=self.image_id,
+            MinCount=self.min_count,
+            MaxCount=self.max_count,
+            **self.config,
+        )["Instances"]
+        instance_ids = []
+        for instance in instances:
+            instance_ids.append(instance["InstanceId"])
+            self.log.info("Created EC2 instance %s", instance["InstanceId"])
+
+            if self.wait_for_completion:
+                ec2_hook.get_waiter("instance_running").wait(
+                    InstanceIds=[instance["InstanceId"]],
+                    WaiterConfig={
+                        "Delay": self.poll_interval,
+                        "MaxAttempts": self.max_attempts,
+                    },
+                )
+
+        return instance_ids
+
+
+class EC2TerminateInstanceOperator(BaseOperator):
+    """
+    Terminate EC2 Instances using boto3
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EC2TerminateInstanceOperator`
+
+    :instance_id: ID of the instance to be terminated.

Review Comment:
   ```suggestion
       :param instance_ids: ID of the instance to be terminated.
   ```
   Just so this parameter is properly displayed in the Python API docs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org