You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 06:40:00 UTC
incubator-airflow git commit: [AIRFLOW-1030][AIRFLOW-1] Fix hook
import for HttpSensor
Repository: incubator-airflow
Updated Branches:
refs/heads/master 15aee05dd -> f2dae7d15
[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor
Closes #2180 from
pdambrauskas/fix/http_hook_import
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f2dae7d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f2dae7d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f2dae7d1
Branch: refs/heads/master
Commit: f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d
Parents: 15aee05
Author: pdambrauskas <p....@gmail.com>
Authored: Tue Apr 4 08:39:54 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:39:54 2017 +0200
----------------------------------------------------------------------
airflow/operators/sensors.py | 24 +++++++++++++-----------
1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f2dae7d1/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 44a97e0..ae50bc5 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -25,12 +25,12 @@ from time import sleep
import re
import sys
-import airflow
-from airflow import hooks, settings
+from airflow import settings
from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
from airflow.models import BaseOperator, TaskInstance
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.hdfs_hook import HDFSHook
+from airflow.hooks.http_hook import HttpHook
from airflow.utils.state import State
from airflow.utils.decorators import apply_defaults
@@ -298,9 +298,9 @@ class NamedHivePartitionSensor(BaseSensorOperator):
raise ValueError('Could not parse ' + partition)
def poke(self, context):
-
if not hasattr(self, 'hook'):
- self.hook = hooks.HiveMetastoreHook(
+ from airflow.hooks.hive_hooks import HiveMetastoreHook
+ self.hook = HiveMetastoreHook(
metastore_conn_id=self.metastore_conn_id)
def poke_partition(partition):
@@ -369,7 +369,8 @@ class HivePartitionSensor(BaseSensorOperator):
'Poking for table {self.schema}.{self.table}, '
'partition {self.partition}'.format(**locals()))
if not hasattr(self, 'hook'):
- self.hook = hooks.HiveMetastoreHook(
+ from airflow.hooks.hive_hooks import HiveMetastoreHook
+ self.hook = HiveMetastoreHook(
metastore_conn_id=self.metastore_conn_id)
return self.hook.check_for_partition(
self.schema, self.table, self.partition)
@@ -470,7 +471,8 @@ class WebHdfsSensor(BaseSensorOperator):
self.webhdfs_conn_id = webhdfs_conn_id
def poke(self, context):
- c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id)
+ from airflow.hooks.webhdfs_hook import WebHDFSHook
+ c = WebHDFSHook(self.webhdfs_conn_id)
logging.info(
'Poking for file {self.filepath} '.format(**locals()))
return c.check_for_path(hdfs_path=self.filepath)
@@ -520,8 +522,8 @@ class S3KeySensor(BaseSensorOperator):
self.s3_conn_id = s3_conn_id
def poke(self, context):
- import airflow.hooks.S3_hook
- hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id)
+ from airflow.hooks.S3_hook import S3Hook
+ hook = S3Hook(s3_conn_id=self.s3_conn_id)
full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
logging.info('Poking for key : {full_url}'.format(**locals()))
if self.wildcard_match:
@@ -567,8 +569,8 @@ class S3PrefixSensor(BaseSensorOperator):
def poke(self, context):
logging.info('Poking for prefix : {self.prefix}\n'
'in bucket s3://{self.bucket_name}'.format(**locals()))
- import airflow.hooks.S3_hook
- hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id)
+ from airflow.hooks.S3_hook import S3Hook
+ hook = S3Hook(s3_conn_id=self.s3_conn_id)
return hook.check_for_prefix(
prefix=self.prefix,
delimiter=self.delimiter,
@@ -660,7 +662,7 @@ class HttpSensor(BaseSensorOperator):
self.extra_options = extra_options or {}
self.response_check = response_check
- self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id)
+ self.hook = HttpHook(method='GET', http_conn_id=http_conn_id)
def poke(self, context):
logging.info('Poking: ' + self.endpoint)