You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (Jira)" <ji...@apache.org> on 2020/12/11 20:09:00 UTC

[jira] [Comment Edited] (NIFI-8021) Allow ConsumeKafka processors to use static partition mapping

    [ https://issues.apache.org/jira/browse/NIFI-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17248145#comment-17248145 ] 

Mark Payne edited comment on NIFI-8021 at 12/11/20, 8:08 PM:
-------------------------------------------------------------

Re-opening this issue because i found a bug. If you have fewer partitions than you have NiFi nodes, in order to pin specific partitions to nodes, you must enter an empty string for at least one node, since all partitions are already otherwise accounted for.

However, if using an empty string, it results in a lot of ERROR logs. For example:
{code:java}
2020-12-11 14:51:21,680 ERROR [Timer-Driven Process Thread-2] o.a.n.p.kafka.pubsub.ConsumeKafka_2_0 ConsumeKafka_2_0[id=535add08-0176-1000-ffff-ffff98b76af7] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@553f24a1 due to java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1163)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:181)
        at org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0.onTrigger(ConsumeKafka_2_0.java:444)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){code}
This should be easy to fix, fortunately. In the createConsumerPool methods of the Consume* processors, we do something like:
{code:java}
final int[] partitionsToConsume;
try {
    partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
} catch (final UnknownHostException uhe) {
    throw new ProcessException("Could not determine localhost's hostname", uhe);
} {code}
An easy fix would be to create a `volatile boolean hasPartitions;` member variable. In this section of the createConsumePool method, simply set `this.hasPartitions = partitionsToConsume.length > 0;`

The, in the `onTrigger` method, check this:
{code:java}
if (!this.hasPartitions) {
    getLogger().debug("This host has no partitions so will not poll Kafka.");
    context.yield();
    return;
} {code}


was (Author: markap14):
Re-opening this issue because i found a bug. If you have more partitions than you have NiFi nodes, in order to pin specific partitions to nodes, you must enter an empty string for at least one node, since all partitions are already otherwise accounted for.

However, if using an empty string, it results in a lot of ERROR logs. For example:
{code:java}
2020-12-11 14:51:21,680 ERROR [Timer-Driven Process Thread-2] o.a.n.p.kafka.pubsub.ConsumeKafka_2_0 ConsumeKafka_2_0[id=535add08-0176-1000-ffff-ffff98b76af7] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@553f24a1 due to java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1163)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:181)
        at org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0.onTrigger(ConsumeKafka_2_0.java:444)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){code}
This should be easy to fix, fortunately. In the createConsumerPool methods of the Consume* processors, we do something like:
{code:java}
final int[] partitionsToConsume;
try {
    partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
} catch (final UnknownHostException uhe) {
    throw new ProcessException("Could not determine localhost's hostname", uhe);
} {code}
An easy fix would be to create a `volatile boolean hasPartitions;` member variable. In this section of the createConsumePool method, simply set `this.hasPartitions = partitionsToConsume.length > 0;`

The, in the `onTrigger` method, check this:
{code:java}
if (!this.hasPartitions) {
    getLogger().debug("This host has no partitions so will not poll Kafka.");
    context.yield();
    return;
} {code}

> Allow ConsumeKafka processors to use static partition mapping
> -------------------------------------------------------------
>
>                 Key: NIFI-8021
>                 URL: https://issues.apache.org/jira/browse/NIFI-8021
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>             Fix For: 1.13.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> With the ConsumeKafka processors, we use the KafkaConsumer.subscribe() method, which is generally what is desirable. However, for a case where strict FIFO order per partition is critical, as may be the case with CDC use cases among others, we should allow users to pin a particular partition to a particular node. This way, even if the node is restarted or stops pulling data for a while, its partitions are not re-assigned, so there's no risk of messages being delivered out of order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)