You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/04 13:39:54 UTC

[GitHub] [airflow] ashb commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

ashb commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r551319501



##########
File path: tests/providers/apache/kafka/sensors/__init__.py
##########
@@ -0,0 +1,17 @@
+#

Review comment:
       You've got no tests under here -- please add some :)

##########
File path: tests/providers/apache/kafka/hooks/__init__.py
##########
@@ -0,0 +1,17 @@
+#

Review comment:
       No tests here either - please add some.

##########
File path: airflow/providers/apache/kafka/provider.yaml
##########
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-kafka
+description: |
+    `Apache Kafka <http://kafka.apache.org/>`__.
+
+versions:
+  - 2.0.0b2

Review comment:
       
   ```suggestion
     - 1.0.0
   ```
   
   This is the provider version, not the airflow version

##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id):
+        """
+            Initializes an instance of the Kafka Producer Hook class.
+        :param conn_id

Review comment:
       ```suggestion
           Initializes an instance of the Kafka Producer Hook class.
           
           :param conn_id:
   ```

##########
File path: airflow/providers/apache/kafka/README.md
##########
@@ -0,0 +1,68 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+
+# Package apache-airflow-providers-apache-kafka
+
+Release: 2.0.0b2
+
+**Table of contents**
+
+- [Provider package](#provider-package)
+- [Installation](#installation)
+- [PIP requirements](#pip-requirements)
+- [Provider class summary](#provider-classes-summary)
+    - [Sensors](#sensors)
+    - [Hooks](#hooks)
+- [Releases](#releases)
+
+## Provider package
+
+This is a provider package for `apache.kafka` provider. All classes for this provider package
+are in `airflow.providers.apache.kafka` python package.
+
+
+
+## Installation
+
+You can install this package on top of an existing airflow 2.* installation via
+`pip install apache-airflow-providers-apache-kafka`
+
+## PIP requirements
+
+| PIP package      | Version required   |
+|:-----------------|:-------------------|
+| kafka | &gt;=1.3.5   |
+
+# Provider classes summary
+
+In Airflow 2.0, all operators, transfers, hooks, sensors, secrets for the `apache.kafka` provider
+are in the `airflow.providers.apache.kafka` package. You can read more about the naming conventions used
+in [Naming conventions for provider packages](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#naming-conventions-for-provider-packages)

Review comment:
       This isn't needed as this is a new module. Remove this section please.

##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id):
+        """
+            Initializes an instance of the Kafka Producer Hook class.
+        :param conn_id
+            The airflow connection ID to use.
+        """
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.producer = None
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:

Review comment:
       `self._conn` is never set. I think you mean
   
   ```suggestion
           if not self.producer:
   ```

##########
File path: airflow/providers/apache/kafka/hooks/kafka_consumer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from kafka import KafkaConsumer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaConsumerHook(BaseHook):
+    """KafkaConsumerHook Class."""
+
+    DEFAULT_HOST = 'kafka1'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, kafka_conn_id='kafka_default'):
+        super().__init__(None)
+        self.conn_id = kafka_conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.extra_dejson = {}
+        self.topic = topic
+        self.host = host
+        self.port = port
+
+    def get_conn(self) -> KafkaConsumer:
+        """
+            A Kafka Consumer object.
+
+        :return:
+            A Kafka Consumer object.
+        """
+        if not self._conn:

Review comment:
       ```suggestion
           if not self.consumer:
   ```

##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id):
+        """
+            Initializes an instance of the Kafka Producer Hook class.
+        :param conn_id
+            The airflow connection ID to use.
+        """
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.producer = None
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:
+            _conn = self.get_connection(self.conn_id)
+            service_options = _conn.extra_dejson
+            host = _conn.host or self.DEFAULT_HOST
+            port = _conn.port or self.DEFAULT_PORT
+
+            self.server = f"""{host}:{port}"""

Review comment:
       ```suggestion
               self.server = f"{host}:{port}"
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org