You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ms...@apache.org on 2016/10/20 16:01:18 UTC

incubator-airflow git commit: [AIRFLOW-96] s3_conn_id using environment variable

Repository: incubator-airflow
Updated Branches:
  refs/heads/master a920035a9 -> f3af6f44e


[AIRFLOW-96] s3_conn_id using environment variable

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
- [AIRFLOW-96](https://issues.apache.org/jira/brow
se/AIRFLOW-96) : allow parameter "s3_conn_id" of
S3KeySensor and S3PrefixSensor to be defined using
an environment variable.

Actually, S3KeySensor and S3PrefixSensor use the
S3hook, which extends BaseHook. BaseHook has
get_connection, which looks a connection up :
- in environment variables first
- and then in the database

Closes #1517 from dm-tran/fix-jira-airflow-96


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f3af6f44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f3af6f44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f3af6f44

Branch: refs/heads/master
Commit: f3af6f44eb4bb3fe0994d42ecc046abb314e4d63
Parents: a920035
Author: Duy-Minh TRAN <du...@adotmob.com>
Authored: Thu Oct 20 21:30:34 2016 +0530
Committer: Sumit Maheshwari <su...@qubole.com>
Committed: Thu Oct 20 21:30:34 2016 +0530

----------------------------------------------------------------------
 airflow/operators/sensors.py | 14 +-------------
 docs/concepts.rst            |  2 +-
 2 files changed, 2 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3af6f44/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 570b682..58040bc 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -26,7 +26,7 @@ from time import sleep
 import airflow
 from airflow import hooks, settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
-from airflow.models import BaseOperator, TaskInstance, Connection as DB
+from airflow.models import BaseOperator, TaskInstance
 from airflow.hooks.base_hook import BaseHook
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
@@ -446,10 +446,6 @@ class S3KeySensor(BaseSensorOperator):
             s3_conn_id='s3_default',
             *args, **kwargs):
         super(S3KeySensor, self).__init__(*args, **kwargs)
-        session = settings.Session()
-        db = session.query(DB).filter(DB.conn_id == s3_conn_id).first()
-        if not db:
-            raise AirflowException("conn_id doesn't exist in the repository")
         # Parse
         if bucket_name is None:
             parsed_url = urlparse(bucket_key)
@@ -465,8 +461,6 @@ class S3KeySensor(BaseSensorOperator):
         self.bucket_key = bucket_key
         self.wildcard_match = wildcard_match
         self.s3_conn_id = s3_conn_id
-        session.commit()
-        session.close()
 
     def poke(self, context):
         import airflow.hooks.S3_hook
@@ -506,18 +500,12 @@ class S3PrefixSensor(BaseSensorOperator):
             s3_conn_id='s3_default',
             *args, **kwargs):
         super(S3PrefixSensor, self).__init__(*args, **kwargs)
-        session = settings.Session()
-        db = session.query(DB).filter(DB.conn_id == s3_conn_id).first()
-        if not db:
-            raise AirflowException("conn_id doesn't exist in the repository")
         # Parse
         self.bucket_name = bucket_name
         self.prefix = prefix
         self.delimiter = delimiter
         self.full_url = "s3://" + bucket_name + '/' + prefix
         self.s3_conn_id = s3_conn_id
-        session.commit()
-        session.close()
 
     def poke(self, context):
         logging.info('Poking for prefix : {self.prefix}\n'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3af6f44/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e27de26..9f65256 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -333,7 +333,7 @@ is named ``postgres_master`` the environment variable should be named
 ``AIRFLOW_CONN_POSTGRES_MASTER`` (note that the environment variable must be
 all uppercase). Airflow assumes the value returned from the environment
 variable to be in a URI format (e.g.
-``postgres://user:password@localhost:5432/master``).
+``postgres://user:password@localhost:5432/master`` or ``s3://accesskey:secretkey@S3``).
 
 Queues
 ======