You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 06:40:00 UTC

incubator-airflow git commit: [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 15aee05dd -> f2dae7d15


[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Closes #2180 from
pdambrauskas/fix/http_hook_import


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f2dae7d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f2dae7d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f2dae7d1

Branch: refs/heads/master
Commit: f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d
Parents: 15aee05
Author: pdambrauskas <p....@gmail.com>
Authored: Tue Apr 4 08:39:54 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:39:54 2017 +0200

----------------------------------------------------------------------
 airflow/operators/sensors.py | 24 +++++++++++++-----------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f2dae7d1/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 44a97e0..ae50bc5 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -25,12 +25,12 @@ from time import sleep
 import re
 import sys
 
-import airflow
-from airflow import hooks, settings
+from airflow import settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
 from airflow.models import BaseOperator, TaskInstance
 from airflow.hooks.base_hook import BaseHook
 from airflow.hooks.hdfs_hook import HDFSHook
+from airflow.hooks.http_hook import HttpHook
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
 
@@ -298,9 +298,9 @@ class NamedHivePartitionSensor(BaseSensorOperator):
             raise ValueError('Could not parse ' + partition)
 
     def poke(self, context):
-
         if not hasattr(self, 'hook'):
-            self.hook = hooks.HiveMetastoreHook(
+            from airflow.hooks.hive_hooks import HiveMetastoreHook
+            self.hook = HiveMetastoreHook(
                 metastore_conn_id=self.metastore_conn_id)
 
         def poke_partition(partition):
@@ -369,7 +369,8 @@ class HivePartitionSensor(BaseSensorOperator):
             'Poking for table {self.schema}.{self.table}, '
             'partition {self.partition}'.format(**locals()))
         if not hasattr(self, 'hook'):
-            self.hook = hooks.HiveMetastoreHook(
+            from airflow.hooks.hive_hooks import HiveMetastoreHook
+            self.hook = HiveMetastoreHook(
                 metastore_conn_id=self.metastore_conn_id)
         return self.hook.check_for_partition(
             self.schema, self.table, self.partition)
@@ -470,7 +471,8 @@ class WebHdfsSensor(BaseSensorOperator):
         self.webhdfs_conn_id = webhdfs_conn_id
 
     def poke(self, context):
-        c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id)
+        from airflow.hooks.webhdfs_hook import WebHDFSHook
+        c = WebHDFSHook(self.webhdfs_conn_id)
         logging.info(
             'Poking for file {self.filepath} '.format(**locals()))
         return c.check_for_path(hdfs_path=self.filepath)
@@ -520,8 +522,8 @@ class S3KeySensor(BaseSensorOperator):
         self.s3_conn_id = s3_conn_id
 
     def poke(self, context):
-        import airflow.hooks.S3_hook
-        hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id)
+        from airflow.hooks.S3_hook import S3Hook
+        hook = S3Hook(s3_conn_id=self.s3_conn_id)
         full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
         logging.info('Poking for key : {full_url}'.format(**locals()))
         if self.wildcard_match:
@@ -567,8 +569,8 @@ class S3PrefixSensor(BaseSensorOperator):
     def poke(self, context):
         logging.info('Poking for prefix : {self.prefix}\n'
                      'in bucket s3://{self.bucket_name}'.format(**locals()))
-        import airflow.hooks.S3_hook
-        hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id)
+        from airflow.hooks.S3_hook import S3Hook
+        hook = S3Hook(s3_conn_id=self.s3_conn_id)
         return hook.check_for_prefix(
             prefix=self.prefix,
             delimiter=self.delimiter,
@@ -660,7 +662,7 @@ class HttpSensor(BaseSensorOperator):
         self.extra_options = extra_options or {}
         self.response_check = response_check
 
-        self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id)
+        self.hook = HttpHook(method='GET', http_conn_id=http_conn_id)
 
     def poke(self, context):
         logging.info('Poking: ' + self.endpoint)