You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jo...@apache.org on 2022/06/03 23:05:20 UTC

[airflow] branch main updated: Remove old Athena Sample DAG (#24170)

This is an automated email from the ASF dual-hosted git repository.

joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b32966c35 Remove old Athena Sample DAG (#24170)
5b32966c35 is described below

commit 5b32966c3545e2ed5182975764efb750eb7a3477
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Fri Jun 3 16:04:59 2022 -0700

    Remove old Athena Sample DAG (#24170)
---
 .../amazon/aws/example_dags/example_athena.py      | 124 ---------------------
 1 file changed, 124 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_athena.py b/airflow/providers/amazon/aws/example_dags/example_athena.py
deleted file mode 100644
index 26b47bdfa1..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_athena.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from datetime import datetime
-from os import getenv
-
-from airflow import DAG
-from airflow.decorators import task
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.athena import AthenaOperator
-from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator, S3DeleteObjectsOperator
-from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
-
-S3_BUCKET = getenv("S3_BUCKET", "test-bucket")
-S3_KEY = getenv('S3_KEY', 'athena-demo')
-ATHENA_TABLE = getenv('ATHENA_TABLE', 'test_table')
-ATHENA_DATABASE = getenv('ATHENA_DATABASE', 'default')
-
-SAMPLE_DATA = """"Alice",20
-"Bob",25
-"Charlie",30
-"""
-SAMPLE_FILENAME = 'airflow_sample.csv'
-
-
-@task
-def read_results_from_s3(query_execution_id):
-    s3_hook = S3Hook()
-    file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, Key=f'{S3_KEY}/{query_execution_id}.csv')
-    file_content = file_obj['Body'].read().decode('utf-8')
-    print(file_content)
-
-
-QUERY_CREATE_TABLE = f"""
-CREATE EXTERNAL TABLE IF NOT EXISTS {ATHENA_DATABASE}.{ATHENA_TABLE} ( `name` string, `age` int )
-ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
-WITH SERDEPROPERTIES ( 'serialization.format' = ',', 'field.delim' = ','
-) LOCATION 's3://{S3_BUCKET}/{S3_KEY}/{ATHENA_TABLE}'
-TBLPROPERTIES ('has_encrypted_data'='false')
-"""
-
-QUERY_READ_TABLE = f"""
-SELECT * from {ATHENA_DATABASE}.{ATHENA_TABLE}
-"""
-
-QUERY_DROP_TABLE = f"""
-DROP TABLE IF EXISTS {ATHENA_DATABASE}.{ATHENA_TABLE}
-"""
-
-with DAG(
-    dag_id='example_athena',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as dag:
-
-    upload_sample_data = S3CreateObjectOperator(
-        task_id='upload_sample_data',
-        s3_bucket=S3_BUCKET,
-        s3_key=f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}',
-        data=SAMPLE_DATA,
-        replace=True,
-    )
-
-    create_table = AthenaOperator(
-        task_id='create_table',
-        query=QUERY_CREATE_TABLE,
-        database=ATHENA_DATABASE,
-        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
-    )
-
-    # [START howto_operator_athena]
-    read_table = AthenaOperator(
-        task_id='read_table',
-        query=QUERY_READ_TABLE,
-        database=ATHENA_DATABASE,
-        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
-    )
-    # [END howto_operator_athena]
-
-    # [START howto_sensor_athena]
-    await_query = AthenaSensor(
-        task_id='await_query',
-        query_execution_id=read_table.output,
-    )
-    # [END howto_sensor_athena]
-
-    drop_table = AthenaOperator(
-        task_id='drop_table',
-        query=QUERY_DROP_TABLE,
-        database=ATHENA_DATABASE,
-        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
-    )
-
-    remove_s3_files = S3DeleteObjectsOperator(
-        task_id='remove_s3_files',
-        bucket=S3_BUCKET,
-        prefix=S3_KEY,
-    )
-
-    (
-        upload_sample_data
-        >> create_table
-        >> read_table
-        >> await_query
-        >> read_results_from_s3(read_table.output)
-        >> drop_table
-        >> remove_s3_files
-    )