You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/10 12:54:28 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

feluelle commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r422639912



##########
File path: airflow/providers/amazon/aws/hooks/glue.py
##########
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueJobHook(AwsBaseHook):
+    """
+    Interact with AWS Glue - create job, trigger, crawler
+
+    :param job_name: unique job name per AWS account
+    :type job_name: Optional[str]
+    :param desc: job description
+    :type desc: Optional[str]
+    :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job
+    :type concurrent_run_limit: int
+    :param script_location: path to etl script on s3
+    :type script_location: Optional[str]
+    :param retry_limit: Maximum number of times to retry this job if it fails
+    :type retry_limit: int
+    :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+    :type num_of_dpus: int
+    :param region_name: aws region name (example: us-east-1)
+    :type region_name: Optional[str]
+    :param iam_role_name: AWS IAM Role for Glue Job
+    :type iam_role_name: Optional[str]
+    :param s3_bucket: S3 bucket where logs and local etl script will be uploaded
+    :type s3_bucket: Optional[str]
+    """
+    JOB_POLL_INTERVAL = 6  # polls job status after every JOB_POLL_INTERVAL seconds
+
+    def __init__(self,
+                 s3_bucket=None,
+                 job_name=None,
+                 desc=None,
+                 concurrent_run_limit=1,
+                 script_location=None,
+                 retry_limit=0,
+                 num_of_dpus=10,
+                 region_name=None,
+                 iam_role_name=None,
+                 *args, **kwargs):
+        self.job_name = job_name
+        self.desc = desc
+        self.concurrent_run_limit = concurrent_run_limit
+        self.script_location = script_location
+        self.retry_limit = retry_limit
+        self.num_of_dpus = num_of_dpus
+        self.region_name = region_name
+        self.s3_bucket = s3_bucket
+        self.role_name = iam_role_name
+        self.s3_glue_logs = 'logs/glue-logs/'
+        super(AwsGlueJobHook, self).__init__(client_type='glue', *args, **kwargs)
+
+    def get_conn(self):
+        """
+        :return: connection
+        """
+        conn = self.get_client_type('glue', self.region_name)
+        return conn

Review comment:
       You don't need this anymore due to https://github.com/apache/airflow/pull/7541

##########
File path: tests/providers/amazon/aws/hooks/test_aws_glue_job_hook.py
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+
+import mock
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+try:
+    from moto import mock_iam
+except ImportError:
+    mock_iam = None
+
+
+class TestGlueJobHook(unittest.TestCase):
+    def setUp(self):
+        self.some_aws_region = "us-west-2"
+
+    @mock.patch.object(AwsGlueJobHook, 'get_conn')
+    def test_get_conn_returns_a_boto3_connection(self, mock_get_conn):
+        hook = AwsGlueJobHook(job_name='aws_test_glue_job', s3_bucket='some_bucket')
+        self.assertIsNotNone(hook.get_conn())

Review comment:
       This test can then also be removed!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org