You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/24 23:26:24 UTC

[GitHub] [airflow] ferruzzi opened a new pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

ferruzzi opened a new pull request #22517:
URL: https://github.com/apache/airflow/pull/22517


   Does what it says on the tin; adds sample dag and docs for an existing transfer.
   
   Part of a project to simplify and standardize AWS sample dags and docs in preparation for adding System Testing.
   
   Related: https://github.com/apache/airflow/pull/21523
   Related: https://github.com/apache/airflow/pull/21475
   Related: https://github.com/apache/airflow/pull/21828
   Related: https://github.com/apache/airflow/pull/21920
   etc... 
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838749074



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       I couldn't figure out a way to do it with the trigger_rule.  I had tried `@task(trigger_rule='all_done')` as seen in 36368e96d3543e5ae873f135808d50f29cbfbcb6 but that fails static checks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1085079077


   @potiuk  It was an adventure, not an Oddesy :stuck_out_tongue: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838967189



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       That didn't age well.  :stuck_out_tongue:   Rebased and pushed reversion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi edited a comment on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi edited a comment on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1078557899


   Gah.  PyCharm flagged that as a warning but it passed the local CI (`./scripts/ci/testing/ci_run_airflow_testing.sh`) so I thought it should pass here.  Is there another way to use the task decorator in this case or should I convert it to a PythonOperator?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838786019



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       Alright, I'll revert that and see.   Maybe I just misread the error message.    I'm, in a meeting, but should have it pushed within the hour




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838749074



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       I couldn't figure out a way to do it with the trigger_rule.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1084921267


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1085073307


   CI is green.  That concludes this little adventure in misreading error messages.  Thanks for the help Jarek and Josh.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r834824998



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete
+# the table you just backed your data up to.  Using 'all_done' so even
+# if an intermediate step fails, the DAG will clean up after itself.
+@task(trigger_rule='all_done')
+def delete_dynamodb_table():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only;  In production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__

Review comment:
       I've removed this from others as I go through them, but in this case I thought it was a nice touch to add a disclaimer that the DAG won't run without prior setup.  I can remove it if we would rather keep it uniform.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi edited a comment on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi edited a comment on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1078557899


   Gah.  PyCharm flagged that as a warning but it passed the local CI (`./scripts/ci/testing/ci_run_airflow_testing.sh`) so I thought it might pass.  Is there another way to use the task decorator in this case or should I convert it to a PythonOperator?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838714572



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       Should convert this to TaskFlow API as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #22517:
URL: https://github.com/apache/airflow/pull/22517


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838749074



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       I couldn't figure out a way to do it with the trigger_rule.  I had tried `@task(trigger_rule='all_done')` as seen in 36368e96d3543e5ae873f135808d50f29cbfbcb6 but that fails static checks.
   
   See: https://apache-airflow.slack.com/archives/CCPRP7943/p1648228076933559




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi closed pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi closed pull request #22517:
URL: https://github.com/apache/airflow/pull/22517


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1081582078


   > Option 2) Replace chain() with the bitwise notation.
   
   Followed by 
   
   > Option 3) "Fix" chain so it accepts the @task notated methods. 
   
   In separate PR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1082141214


   Created an Issue for it.  If someone else gets to it before me then even better:  https://github.com/apache/airflow/issues/22594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1078557899


   Gah.  PyCharm flagged that asa  warning but it passed the local CI (`./scripts/ci/testing/ci_run_airflow_testing.sh`) so I thought it might pass.  Is there another way to use the task decorator in this case or should I convert it to a PythonOperator?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1081236759


   Looks like chain() doesn't like @task decorated methods.  Posted to the Slack server to discuss fix:  https://apache-airflow.slack.com/archives/CCPRP7943/p1648498867915099
   
   Option1) Convert the @tasks to PythonOperators.
   Option 2) Replace chain() with the bitwise notation.
   Option 3) "Fix" chain so it accepts the @task notated methods.
   Option 4) Something else I haven't considered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838783758



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       Wasn't that because using `chain()`with TaskFlow API was not compatible with Airflow 2.1? You should be able to pass a `trigger_rule` directly to the decorator like you're trying to do with Airflow 2.1+




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1085077101


   @ferruzzi - your adventure was very, vey little. Read THIS https://github.com/apache/airflow/pull/22548#issuecomment-1085016498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#issuecomment-1082270667


   Rebased and fixed the chain() notation to bitwise for 2.1 compatibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #22517: Adds HiveToDynamoDB Transfer Sample DAG and Docs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #22517:
URL: https://github.com/apache/airflow/pull/22517#discussion_r838786019



##########
File path: airflow/providers/amazon/aws/example_dags/example_hive_to_dynamodb.py
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+   This DAG will not work unless you create an Amazon EMR cluster running
+   Apache Hive and copy data into it following steps 1-4 (inclusive) here:
+   https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
+"""
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
+from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
+from airflow.utils import db
+
+DYNAMODB_TABLE_NAME = 'example_hive_to_dynamodb_table'
+HIVE_CONNECTION_ID = os.getenv('HIVE_CONNECTION_ID', 'hive_on_emr')
+HIVE_HOSTNAME = os.getenv('HIVE_HOSTNAME', 'ec2-123-45-67-890.compute-1.amazonaws.com')
+
+# These values assume you set up the Hive data source following the link above.
+DYNAMODB_TABLE_HASH_KEY = 'feature_id'
+HIVE_SQL = 'SELECT feature_id, feature_name, feature_class, state_alpha FROM hive_features'
+
+
+@task
+def create_dynamodb_table():
+    client = DynamoDBHook(client_type='dynamodb').conn
+    client.create_table(
+        TableName=DYNAMODB_TABLE_NAME,
+        KeySchema=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'KeyType': 'HASH'},
+        ],
+        AttributeDefinitions=[
+            {'AttributeName': DYNAMODB_TABLE_HASH_KEY, 'AttributeType': 'N'},
+        ],
+        ProvisionedThroughput={'ReadCapacityUnits': 20, 'WriteCapacityUnits': 20},
+    )
+
+    # DynamoDB table creation is nearly, but not quite, instantaneous.
+    # Wait for the table to be active to avoid race conditions writing to it.
+    waiter = client.get_waiter('table_exists')
+    waiter.wait(TableName=DYNAMODB_TABLE_NAME, WaiterConfig={'Delay': 1})
+
+
+@task
+def get_dynamodb_item_count():
+    """
+    A DynamoDB table has an ItemCount value, but it is only updated every six hours.
+    To verify this DAG worked, we will scan the table and count the items manually.
+    """
+    table = DynamoDBHook(resource_type='dynamodb').conn.Table(DYNAMODB_TABLE_NAME)
+
+    response = table.scan(Select='COUNT')
+    item_count = response['Count']
+
+    while 'LastEvaluatedKey' in response:
+        response = table.scan(Select='COUNT', ExclusiveStartKey=response['LastEvaluatedKey'])
+        item_count += response['Count']
+
+    print(f'DynamoDB table contains {item_count} items.')
+
+
+# Included for sample purposes only; in production you wouldn't delete your fresh backup.
+def delete_dynamodb_table_fn():
+    DynamoDBHook(client_type='dynamodb').conn.delete_table(TableName=DYNAMODB_TABLE_NAME)
+
+
+# Included for sample purposes only; in production this should
+# be configured in the environment and not be part of the DAG.
+# Note: The 'hiveserver2_default' connection will not work if Hive
+# is hosted on EMR.  You must set the host name of the connection
+# to match your EMR cluster's hostname.
+@task
+def configure_hive_connection():
+    db.merge_conn(
+        Connection(
+            conn_id=HIVE_CONNECTION_ID,
+            conn_type='hiveserver2',
+            host=HIVE_HOSTNAME,
+            port=10000,
+        )
+    )
+
+
+with DAG(
+    dag_id='example_hive_to_dynamodb',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    # Add the prerequisites docstring to the DAG in the UI.
+    dag.doc_md = __doc__
+
+    # [START howto_transfer_hive_to_dynamodb]
+    backup_to_dynamodb = HiveToDynamoDBOperator(
+        task_id='backup_to_dynamodb',
+        hiveserver2_conn_id=HIVE_CONNECTION_ID,
+        sql=HIVE_SQL,
+        table_name=DYNAMODB_TABLE_NAME,
+        table_keys=[DYNAMODB_TABLE_HASH_KEY],
+    )
+    # [END howto_transfer_hive_to_dynamodb]
+
+    # Using 'all_done' so even if an intermediate step fails, the DAG will clean up after itself.
+    delete_dynamodb_table = PythonOperator(
+        task_id="delete_dynamodb_table",
+        python_callable=delete_dynamodb_table_fn,
+        trigger_rule='all_done',
+    )

Review comment:
       Alright, I'll revert that and see.   Maybe I just misread the error message.    I'm in a meeting, but should have it pushed within the hour




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org