You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/16 19:55:47 UTC

[GitHub] [airflow] dferguson992 opened a new pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

dferguson992 opened a new pull request #12388:
URL: https://github.com/apache/airflow/pull/12388


   Dear Airflow Maintainers,
   
   Please accept the following PR that
   
   Add the KafkaProducerHook.
   Add the KafkaConsumerHook.
   Add the KafkaSensor which listens to messages with a specific topic.
   Related Issue:
   #1311
   
   Issue link: AIRFLOW-6786
   
   Make sure to mark the boxes below before creating PR: [x]
   
   Description above provides context of the change
   Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
   Unit tests coverage for changes (not needed for documentation changes)
   Commits follow "How to write a good git commit message"
   Relevant documentation is updated including usage instructions.
   I will engage committers as explained in Contribution Workflow Example.
   For document-only changes commit message can start with [AIRFLOW-XXXX].
   Reminder to contributors:
   
   You must add an Apache License header to all new files
   Please squash your commits when possible and follow the 7 rules of good Git commits
   I am new to the community, I am not sure the files are at the right place or missing anything.
   
   The sensor could be used as the first node of a dag where the second node can be a TriggerDagRunOperator. The messages are polled in a batch and the dag runs are dynamically generated.
   
   Thanks!
   
   Note, as per denied PR #1415, it is important to mention these integrations are not suitable for low-latency/high-throughput/streaming. For reference, #1415 (comment).
   
   Co-authored-by: Dan Ferguson dferguson992@gmail.com
   Co-authored-by: YuanfΞi Zhu


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-728368333


   [The Workflow run](https://github.com/apache/airflow/actions/runs/366952705) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] debashis-das commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
debashis-das commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-987056630


   @potiuk I have a same requirement. If I am able to implement it. Will raise a PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-754037983


   [The Workflow run](https://github.com/apache/airflow/actions/runs/461274053) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-906824992


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] buffaloDeveloper commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
buffaloDeveloper commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-877954947


   > > My suggestion, as an interim solution, is to spin up a container running a faust app, and have it read the messages and then post to the airflow api to trigger a job.
   > 
   > Would using a 'confluent-kafka' consumer instead do the trick?
   
   I don't think so.  You could use that to create a Kafka consumer, but it will not work like an Airflow sensor.  A shame this sensor is not ready yet, as a workaround I'm looking at using either Faust (as suggested above) or Kafka Streaming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-748620433


   @rotten to my knowledge yes.  Now its so outdated, and the structure of this project seems to change overnight. I missed the 2.0 release and frankly I'm just so over it.  This community is great, but contributing to this project while you work and have other obligations, even if its literally three tiny objects, is really difficult for me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r524662254



##########
File path: docs/installation.rst
##########
@@ -125,18 +125,188 @@ yum package, or whatever equivalent applies on the distribution you are using.
 Behind the scenes, Airflow does conditional imports of operators that require
 these extra dependencies.
 
-For the list of the subpackages and what they enable, see: :doc:`extra-packages-ref`.
-
-Provider packages
-'''''''''''''''''
-
-Unlike Apache Airflow 1.10, the Airflow 2.0 is delivered in multiple, separate, but connected packages.
-The core of Airflow scheduling system is delivered as ``apache-airflow`` package and there are around
-60 providers packages which can be installed separately as so called "Airflow Provider packages".
-The default Airflow installation doesn't have many integrations and you have to install them yourself.
-For more information, see: :doc:`provider-packages`
-
-For the list of the provider packages and what they enable, see: :doc:`provider-packages-ref`.
+Here's the list of the subpackages and what they enable:
+
+
+**Fundamentals:**

Review comment:
       This table is now in the extra-packages-ref.rst file




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r526097929



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.producer = None
+        self.topic = topic
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:
+            _conn = self.get_connection(self.conn_id)
+            service_options = _conn.extra_dejson
+            host = _conn.host or self.DEFAULT_HOST
+            port = _conn.port or self.DEFAULT_PORT
+
+            self.server = f"""{host}:{port}"""
+            self.consumer = KafkaProducer(bootstrap_servers=self.server, **service_options)
+        return self.producer
+
+    def send_message(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
+        """
+            Sends a message on the specified topic and partition.  Keyed messages will be sent in order.
+
+        :param topic:

Review comment:
       they should be the same but it is not necessary to add them twice, I've removed from init




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r525544082



##########
File path: docs/operators-and-hooks-ref.rst
##########
@@ -147,9 +147,91 @@ Software operators and hooks
 These integrations allow you to perform various operations within software developed by Apache Software
 Foundation.
 
-.. operators-hooks-ref::
-   :tags: apache
-   :header-separator: "
+.. list-table::

Review comment:
       This table is generated automatically. You don't have to change it. Can you undo this change?

##########
File path: docs/operators-and-hooks-ref.rst
##########
@@ -147,9 +147,91 @@ Software operators and hooks
 These integrations allow you to perform various operations within software developed by Apache Software
 Foundation.
 
-.. operators-hooks-ref::
-   :tags: apache
-   :header-separator: "
+.. list-table::

Review comment:
       This table is generated automatically. You don't have to change it. Can you revert this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] DestroyerAlpha commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
DestroyerAlpha commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-873211906


   
   
   
   
   > 
   > 
   > My suggestion, as an interim solution, is to spin up a container running a faust app, and have it read the messages and then post to the airflow api to trigger a job.
   
   Would using a 'confluent-kafka' consumer instead do the trick?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-755056286


   > I can't seem to get this to build locally now. pip install -e C:\...\airflow complains about
   
   Windows is not supported because Airflow has dependencies that use Unix system calls. See: 
    https://github.com/apache/airflow/issues/10388


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luup2k commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
luup2k commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-782756798


   Is there a good reason to use kafka lib, https://pypi.org/project/kafka/ ? this is a frozen release: https://github.com/dpkp/kafka-python/issues/1726 . if we wish to go with a pure-python lib i guess that we'll need to use: https://pypi.org/project/kafka-python/ instead.
   
   As a side topic: why we don't use confluent-kafka: https://github.com/confluentinc/confluent-kafka-python ??, this lib bind librdkafka and has a superior performance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-984991868


   unbelivable (!) you have not done it yet @serge-salamanka-1pt !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r525542448



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):

Review comment:
       Can you add docstrings? In particular, for the `topic` parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r526090569



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.producer = None
+        self.topic = topic
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:

Review comment:
       The `_conn` attribute is set on line 47 using the base_hook `get_connection` method.  This expression will be true once during the first call to `get_conn` where it is initialized on lines 47 - 53.  Thereafter, it returns the KafkaProducer object.
   
   On that note, I've changed line 53 to say `self.producer =` and removed the `consumer` attribute entirely




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kubatyszko commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
kubatyszko commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-753283848


   @ashb do you need any help with this ? @rotten and I are happy to provide assistance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r524666663



##########
File path: docs/autoapi_templates/index.rst
##########
@@ -86,6 +86,10 @@ All operators are in the following packages:
 
   airflow/providers/apache/hive/transfers/index
 
+  airflow/providers/apache/kafka/hooks/index

Review comment:
       Can you look at the end of this document?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] flolas commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
flolas commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-853343842


   I'm interested in this PR! I can help if needed. @ashb @shkolnik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] shkolnik commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
shkolnik commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-852368649


   @ashb Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r551319501



##########
File path: tests/providers/apache/kafka/sensors/__init__.py
##########
@@ -0,0 +1,17 @@
+#

Review comment:
       You've got no tests under here -- please add some :)

##########
File path: tests/providers/apache/kafka/hooks/__init__.py
##########
@@ -0,0 +1,17 @@
+#

Review comment:
       No tests here either - please add some.

##########
File path: airflow/providers/apache/kafka/provider.yaml
##########
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-kafka
+description: |
+    `Apache Kafka <http://kafka.apache.org/>`__.
+
+versions:
+  - 2.0.0b2

Review comment:
       
   ```suggestion
     - 1.0.0
   ```
   
   This is the provider version, not the airflow version

##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id):
+        """
+            Initializes an instance of the Kafka Producer Hook class.
+        :param conn_id

Review comment:
       ```suggestion
           Initializes an instance of the Kafka Producer Hook class.
           
           :param conn_id:
   ```

##########
File path: airflow/providers/apache/kafka/README.md
##########
@@ -0,0 +1,68 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+
+# Package apache-airflow-providers-apache-kafka
+
+Release: 2.0.0b2
+
+**Table of contents**
+
+- [Provider package](#provider-package)
+- [Installation](#installation)
+- [PIP requirements](#pip-requirements)
+- [Provider class summary](#provider-classes-summary)
+    - [Sensors](#sensors)
+    - [Hooks](#hooks)
+- [Releases](#releases)
+
+## Provider package
+
+This is a provider package for `apache.kafka` provider. All classes for this provider package
+are in `airflow.providers.apache.kafka` python package.
+
+
+
+## Installation
+
+You can install this package on top of an existing airflow 2.* installation via
+`pip install apache-airflow-providers-apache-kafka`
+
+## PIP requirements
+
+| PIP package      | Version required   |
+|:-----------------|:-------------------|
+| kafka | &gt;=1.3.5   |
+
+# Provider classes summary
+
+In Airflow 2.0, all operators, transfers, hooks, sensors, secrets for the `apache.kafka` provider
+are in the `airflow.providers.apache.kafka` package. You can read more about the naming conventions used
+in [Naming conventions for provider packages](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#naming-conventions-for-provider-packages)

Review comment:
       This isn't needed as this is a new module. Remove this section please.

##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id):
+        """
+            Initializes an instance of the Kafka Producer Hook class.
+        :param conn_id
+            The airflow connection ID to use.
+        """
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.producer = None
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:

Review comment:
       `self._conn` is never set. I think you mean
   
   ```suggestion
           if not self.producer:
   ```

##########
File path: airflow/providers/apache/kafka/hooks/kafka_consumer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from kafka import KafkaConsumer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaConsumerHook(BaseHook):
+    """KafkaConsumerHook Class."""
+
+    DEFAULT_HOST = 'kafka1'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, kafka_conn_id='kafka_default'):
+        super().__init__(None)
+        self.conn_id = kafka_conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.extra_dejson = {}
+        self.topic = topic
+        self.host = host
+        self.port = port
+
+    def get_conn(self) -> KafkaConsumer:
+        """
+            A Kafka Consumer object.
+
+        :return:
+            A Kafka Consumer object.
+        """
+        if not self._conn:

Review comment:
       ```suggestion
           if not self.consumer:
   ```

##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id):
+        """
+            Initializes an instance of the Kafka Producer Hook class.
+        :param conn_id
+            The airflow connection ID to use.
+        """
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.producer = None
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:
+            _conn = self.get_connection(self.conn_id)
+            service_options = _conn.extra_dejson
+            host = _conn.host or self.DEFAULT_HOST
+            port = _conn.port or self.DEFAULT_PORT
+
+            self.server = f"""{host}:{port}"""

Review comment:
       ```suggestion
               self.server = f"{host}:{port}"
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rotten commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
rotten commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-749027508


   The documentation conflicts are easy to resolve and I'm happy to resolve them to see this move along.  I can either push the changes to your branch in your repo directly if you want to invite me to that project, or I can fork your fork, and then pr back to you to then pr back to here.
   
   Or, I can fork your repo, we can close this PR, and then PR a "try #4".  Let me know how you'd like to proceed.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rotten commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
rotten commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-748176452


   Do you need help resolving the (documentation) merge conflicts?  Is that the only thing blocking this getting merged?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r525543537



##########
File path: airflow/providers/apache/kafka/sensors/kafka.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class KafkaSensor(BaseSensorOperator):
+    """Consumes the Kafka message with the specific topic"""

Review comment:
       We try to keep the documentation for `__init__` as class documentation. Can you move it two lines up?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-755056572


   > On the subject of testing, I wanted to add this dependency to assist in the mocking.
   https://pypi.org/project/pytest-kafka/
   I have no clue where to even begin adding this dependency to the project so it builds and runs tests appropriately.
   Any advice where to go with this?
   
   
   You should add this dependency to "devel" extra. See: https://github.com/apache/airflow/blob/c2ead47e04468e815c0b0a9ec059b46f9c484de2/setup.py#L450


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-729076267


   @mik-laj can i please get a review of this PR?  It's passed all of the tests and just needs 1 approving review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] shkolnik commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
shkolnik commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-845453173


   @dferguson992 @luup2k @RosterIn @ashb @mik-laj this seems like a very useful capability and a lot of effort went into building it. What would it take to wrap it up and get it merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r524660404



##########
File path: docs/operators-and-hooks-ref.rst
##########
@@ -147,10 +147,91 @@ Software operators and hooks
 These integrations allow you to perform various operations within software developed by Apache Software
 Foundation.
 
-.. operators-hooks-ref::
-   :tags: apache
-   :header-separator: "
+.. list-table::

Review comment:
       Please don't do this. This table is now generated automatically from provider.yaml, but you have to assign the tags.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r526092240



##########
File path: airflow/providers/apache/kafka/sensors/kafka.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class KafkaSensor(BaseSensorOperator):
+    """Consumes the Kafka message with the specific topic"""

Review comment:
       no can do, it fails the pydocstyle checks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #12388:
URL: https://github.com/apache/airflow/pull/12388


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r526097673



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):

Review comment:
       i see now yes that parameter in init is not necessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] serge-salamanka-1pt commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
serge-salamanka-1pt commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-984473014


   unbelievable (!) Ariflow does not support Kafka out of the box yet !?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-755051473


   I can't seem to get this to build locally now.  `pip install -e C:\...\airflow` complains about
   
   I've installed all of the VC++ libraries and SDKs, about 8GB worth; still getting this error.
   ERROR: Could not build wheels for setproctitle which use PEP 517 and cannot be installed directly
   
   I tried running `pip install setproctitle` manually and got the same error.
   ERROR: Could not build wheels for setproctitle which use PEP 517 and cannot be installed directly
   
   Any thoughts on this?
   
   On the subject of testing, I wanted to add this dependency to assist in the mocking.
   https://pypi.org/project/pytest-kafka/
   I have no clue where to even begin adding this dependency to the project so it builds and runs tests appropriately.
   Any advice where to go with this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-755057876


   I have looked at this library and it seems to me that it might complicate the tests a bit. Have you thought to use unittest.mock for unit tests and to set up a separate Docker container from Kafka for integration tests? This sounds a lot more stable and more maintainable to me than adding a library which may have further dependency issues.
   
   Here is an example of adding integration tests that use a separate container.
   https://github.com/apache/airflow/pull/13195


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-729021769


   [The Workflow run](https://github.com/apache/airflow/actions/runs/368443647) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luup2k commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
luup2k commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r579772714



##########
File path: airflow/providers/apache/kafka/hooks/kafka_consumer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from kafka import KafkaConsumer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaConsumerHook(BaseHook):
+    """KafkaConsumerHook Class."""
+
+    DEFAULT_HOST = 'kafka1'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, kafka_conn_id='kafka_default'):
+        super().__init__(None)
+        self.conn_id = kafka_conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.extra_dejson = {}
+        self.topic = topic
+        self.host = host
+        self.port = port
+
+    def get_conn(self) -> KafkaConsumer:
+        """
+            A Kafka Consumer object.
+
+        :return:
+            A Kafka Consumer object.
+        """
+        if not self.consumer:
+            conn = self.get_connection(self.conn_id)
+            service_options = conn.extra_dejson
+            host = conn.host or self.DEFAULT_HOST
+            port = conn.port or self.DEFAULT_PORT
+
+            self.server = f"""{host}:{port}"""
+            self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server, **service_options)
+        return self.consumer
+
+    def get_messages(self, timeout_ms=5000) -> dict:
+        """
+        Get all the messages haven't been consumed, it doesn't

Review comment:
       >  "Get all the messages haven't been consumed,"
   
   If we use poll() without max_records, the behavior is returns at most "max_poll_records" #records. "max_poll_records" is setted to 500 by default at Consumer Init config.
   
   So, we're not going to consume "all" message except we put a very high number as max_poll_records(could be a memory bomb) or we have a low number of message in the topic.
   
   https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.poll
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-984991544


   @serge-salamanka-1pt - maybe you would like to contribute it ? Airflow is created by >1800 contributors and you can become one and add Kafka support! The OSS world works this way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-753981109


   Oh could you also render the docs and double check the look right in html: `./breeze build-docs -- --package-filter apache-airflow-providers-kafka` I think should do it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r525541168



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.producer = None
+        self.topic = topic
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:

Review comment:
       This attribute is never set. In other words, this condition is always met. Is it expected?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-731232117


   [The Workflow run](https://github.com/apache/airflow/actions/runs/374623650) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r524661559



##########
File path: airflow/providers/apache/kafka/provider.yaml
##########
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-kafka
+description: |
+    `Apache Kafka <http://kafka.apache.org/>`__.
+
+versions:
+  - 2.0.0b2
+
+integrations:
+  - integration-name: Apache Kafka
+    external-doc-url: http://kafka.apache.org/
+    tags: [apache]

Review comment:
       This is essential for the automatically generated operators and hooks reference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-728343544


   [The Workflow run](https://github.com/apache/airflow/actions/runs/366875917) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #12388:
URL: https://github.com/apache/airflow/pull/12388


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj edited a comment on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-729232971


   It seems to me that a few unit tests might have helped us prevent regression in the future. Can you add it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-845912208


   Yup, I'll pick this up and finish it off/get it over the line.
   
   Sorry I forgot about it (AGAIN!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-987086352


   > @potiuk I have a same requirement. If I am able to implement it. Will raise a PR.
   
   Cool!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #12388:
URL: https://github.com/apache/airflow/pull/12388


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r526092240



##########
File path: airflow/providers/apache/kafka/sensors/kafka.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class KafkaSensor(BaseSensorOperator):
+    """Consumes the Kafka message with the specific topic"""

Review comment:
       no can do, it fails the black style checks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-729022523


   [The Workflow run](https://github.com/apache/airflow/actions/runs/368445316) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] DestroyerAlpha commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
DestroyerAlpha commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-872722576


   What is the update on this? Is it a work in progress? If so, is there an expected horizon? If not, what are the alternatives we could try for a task that needs to Check a Kafka topic for new events and trigger a task?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-755054247


   > ./breeze build-docs -- --package-filter apache-airflow-providers-kafka
   
   Is there a way to run breeze on Windows?  I can't seem to successfully run breeze on Windows or within Cygwin.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RosterIn commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
RosterIn commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r574311600



##########
File path: airflow/providers/apache/kafka/sensors/kafka.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class KafkaSensor(BaseSensorOperator):
+    """Consumes the Kafka message with the specific topic"""
+
+    DEFAULT_HOST = 'kafka1'
+    DEFAULT_PORT = 9092
+    templated_fields = ('topic', 'host', 'port', ß)

Review comment:
       ```suggestion
       templated_fields = ('topic', 'host', 'port')
   ```

##########
File path: tests/providers/apache/kafka/sensors/test_kafka.py
##########
@@ -0,0 +1,70 @@
+# #
+# # Licensed to the Apache Software Foundation (ASF) under one
+# # or more contributor license agreements.  See the NOTICE file
+# # distributed with this work for additional information
+# # regarding copyright ownership.  The ASF licenses this file
+# # to you under the Apache License, Version 2.0 (the
+# # "License"); you may not use this file except in compliance
+# # with the License.  You may obtain a copy of the License at
+# #
+# #   http://www.apache.org/licenses/LICENSE-2.0
+# #
+# # Unless required by applicable law or agreed to in writing,
+# # software distributed under the License is distributed on an
+# # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# # KIND, either express or implied.  See the License for the
+# # specific language governing permissions and limitations
+# # under the License.
+#
+# from cached_property import cached_property
+#
+# from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook
+# from airflow.sensors.base_sensor_operator import BaseSensorOperator
+# from airflow.utils.decorators import apply_defaults
+#
+#
+# class KafkaSensor(BaseSensorOperator):
+#     """Consumes the Kafka message with the specific topic"""
+#
+#     DEFAULT_HOST = 'kafka1'
+#     DEFAULT_PORT = 9092
+#     templated_fields = ('topic', 'host', 'port', ß)
+#
+#     @apply_defaults
+#     def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, *args, **kwargs):
+#         """
+#             Initialize the sensor, the connection establish
+#             is put off to it's first time usage.
+#
+#         :param topic:
+#         :param host:
+#         :param port:
+#         :param args:
+#         :param kwargs:
+#         """

Review comment:
       why all the code is under comment ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dferguson992 commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
dferguson992 commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r526084695



##########
File path: docs/operators-and-hooks-ref.rst
##########
@@ -147,9 +147,91 @@ Software operators and hooks
 These integrations allow you to perform various operations within software developed by Apache Software
 Foundation.
 
-.. operators-hooks-ref::
-   :tags: apache
-   :header-separator: "
+.. list-table::

Review comment:
       so delete lines 150 - 235?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-728300257


   [The Workflow run](https://github.com/apache/airflow/actions/runs/366755498) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r525542613



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.producer = None
+        self.topic = topic
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:
+            _conn = self.get_connection(self.conn_id)
+            service_options = _conn.extra_dejson
+            host = _conn.host or self.DEFAULT_HOST
+            port = _conn.port or self.DEFAULT_PORT
+
+            self.server = f"""{host}:{port}"""
+            self.consumer = KafkaProducer(bootstrap_servers=self.server, **service_options)
+        return self.producer
+
+    def send_message(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
+        """
+            Sends a message on the specified topic and partition.  Keyed messages will be sent in order.
+
+        :param topic:
+        :param value:
+        :param key:
+        :param partition:
+        :param timestamp_ms:
+        :return:
+        """
+        producer = self.get_conn()
+        try:
+            future_record_metadata = producer.send(
+                topic, value=value, key=key, partition=partition, timestamp_ms=timestamp_ms
+            )
+        finally:
+            producer.close()
+        return future_record_metadata
+
+    def __repr__(self):
+        """
+            A pretty version of the connection string.
+
+        :return:
+            A pretty version of the connection string.
+        """
+        connected = self.producer is not None
+        return '<KafkaProducerHook ' 'connected?=%s server=%s topic=%s>' % (

Review comment:
       ```suggestion
           return '<KafkaProducerHook connected?=%s server=%s topic=%s>' % (
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-755055903


   > Is there a way to run breeze on Windows? I can't seem to successfully run breeze on Windows or within Cygwin.
   
   Yes. But only with Docker running in WSL 2. See: https://github.com/apache/airflow/blob/master/BREEZE.rst#docker-in-wsl-2


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-729232971


   It seems to me that a little unit testing might have helped us prevent regression in the future. Can you add it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-749054835


   The structure looks good -- I'll try and rebase this for you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rotten commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
rotten commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-873006674


   My suggestion, as an interim solution, is to spin up a container running a faust app, and have it read the messages and then post to the airflow api to trigger a job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-854822852


   > Yup, I'll pick this up and finish it off/get it over the line.
   > 
   > Sorry I forgot about it (AGAIN!)
   
   I've taken a look at this PR, and it's further from being ready to merge than I thought.
   
   - there's no unit tests to speak of (or at all realy)
   - The Sensor takes a host and port, but not connection id parameters
   - The consumer Hook takes host and port but ignores these (it shouldnt' accept them)
   - Commented out code doesn't belong in git.
   
   
   In short: there is more work here than I thought to get this in a state to be merged, so I don't have time right now. Sorry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#discussion_r525541947



##########
File path: airflow/providers/apache/kafka/hooks/kafka_producer.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kafka import KafkaProducer
+
+from airflow.hooks.base_hook import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook Class."""
+
+    DEFAULT_HOST = 'localhost'
+    DEFAULT_PORT = 9092
+
+    def __init__(self, conn_id, topic):
+        super().__init__(None)
+        self.conn_id = conn_id
+        self._conn = None
+        self.server = None
+        self.consumer = None
+        self.producer = None
+        self.topic = topic
+
+    def get_conn(self) -> KafkaProducer:
+        """
+            Returns a Kafka Producer
+
+        :return:
+            A Kafka Producer object.
+        """
+        if not self._conn:
+            _conn = self.get_connection(self.conn_id)
+            service_options = _conn.extra_dejson
+            host = _conn.host or self.DEFAULT_HOST
+            port = _conn.port or self.DEFAULT_PORT
+
+            self.server = f"""{host}:{port}"""
+            self.consumer = KafkaProducer(bootstrap_servers=self.server, **service_options)
+        return self.producer
+
+    def send_message(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
+        """
+            Sends a message on the specified topic and partition.  Keyed messages will be sent in order.
+
+        :param topic:

Review comment:
       Can you add docstring to this method? In particular, I am interested in the difference between the `topic` parameter in `__init__` and this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12388: [AIRFLOW-6786] Added Kafka components, 3rd time's the charm

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12388:
URL: https://github.com/apache/airflow/pull/12388#issuecomment-815353587


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org