You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/11/17 22:28:39 UTC

[GitHub] [nifi] markap14 opened a new pull request #4672: NIFI-8021: Provide the ability to pin partitions to particular hosts …

markap14 opened a new pull request #4672:
URL: https://github.com/apache/nifi/pull/4672


   …when using ConsumeKafka processors
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on a change in pull request #4672: NIFI-8021: Provide the ability to pin partitions to particular hosts …

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4672:
URL: https://github.com/apache/nifi/pull/4672#discussion_r526136155



##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern".
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern".
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern".
   ```

##########
File path: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern".
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4672: NIFI-8021: Provide the ability to pin partitions to particular hosts …

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4672:
URL: https://github.com/apache/nifi/pull/4672


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org