You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by david-streamlio <gi...@git.apache.org> on 2018/03/15 16:43:38 UTC

[GitHub] nifi pull request #2553: Nifi 4908 rebase

GitHub user david-streamlio opened a pull request:

    https://github.com/apache/nifi/pull/2553

    Nifi 4908 rebase

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/david-streamlio/nifi NIFI-4908-rebase

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2553
    
----
commit e4f8550159132d85d0e19d61f24e087e51742dee
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-24T01:40:22Z

    Added Pulsar processors and Controller Service

commit b8b9601f2469df811b59b1019f0b7052dff9abd3
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-26T19:19:26Z

    Updated all nifi component version references to 1.6.0-SNAPSHOT

commit 2ebdc06394d3d6f5fa71ec07fc5d086e81f4e632
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-26T19:48:56Z

    Merge remote-tracking branch 'upstream/master' into NIFI-4908

commit d4e82ecc49e62596264db206f26b3b6a93f4b021
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-28T18:18:35Z

    Refactored Processors to indicate the supported version of Pulsar, i.e _1_0

commit 8b24b2496d404a65e6427273395ae73e558a5531
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-24T01:40:22Z

    Added Pulsar processors and Controller Service

commit 29ca7f2b653503de9ad13da9f6c49cc2f311eaae
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-26T19:19:26Z

    Updated all nifi component version references to 1.6.0-SNAPSHOT

commit 3e30c9530ab0cd844fb900607dd00ff1a4014221
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-28T18:18:35Z

    Refactored Processors to indicate the supported version of Pulsar, i.e _1_0

commit 20b451d66ba6f7bbcc057f752a25101e33580469
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-24T01:40:22Z

    Added Pulsar processors and Controller Service

commit 475491195d2f1fc2c24a4f403d6fc6cfd30982b0
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-26T19:19:26Z

    Updated all nifi component version references to 1.6.0-SNAPSHOT

commit 046e28703914f66efbea27a6112bc88b33750bdf
Author: David Kjerrumgaard <da...@...>
Date:   2018-02-28T18:18:35Z

    Refactored Processors to indicate the supported version of Pulsar, i.e _1_0

commit f36b0b61680c55a5ac0279a44cdc580b42f20118
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T03:01:06Z

    Combined Pulsar components into a single bundle

commit 7de9f99be1724178e768e89737a1b2d57e45f08b
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T05:48:25Z

    Merged with HEAD

commit 03e8ef2a19381cd3249eb45c471c36b4ae265566
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T06:26:46Z

    Fixed parent.relativePath in POM files

commit e61bc9b559cac45200159efa6d875d68a5713cfa
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T06:29:43Z

    Added nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/ module

commit 27d3f86315dd91fc8e61d4399caa289304bf5fd2
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T06:41:34Z

    Added dependency versions that appveyor complained about

commit a6aff8ccbc5cc6fd87bddb3c1ffb0a7821b40e7f
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T07:04:53Z

    Added nifi-pulsar-client-service dependency to nifi-nar-bundle

commit 0c186183fa1ee998a61960b2494838459a324023
Author: David Kjerrumgaard <da...@...>
Date:   2018-03-10T15:25:00Z

    Fixed Maven dependencies

----


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174883778
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarProducer;
    +import org.apache.nifi.pulsar.cache.LRUCache;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.CompressionType;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API."
    +    + "The messages to send may be individual FlowFiles or may be delimited, using a "
    +    + "user-specified delimiter, such as a new-line. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
    --- End diff --
    
    Consider changing it to `PublishPulsar_1_X`.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    @MikeThomsen   We have a pre-configured Docker image that has Pulsar if you want to use that for testing.  See https://streaml.io/docs/getting-started. for detailed instructions.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    At this risk of sounding stupid, can you spell out the steps you are suggesting? I want to make sure I don't screw up the process.  I will close this PR once I "rebase 4914 and squash it into one commit"


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174866349
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java ---
    @@ -0,0 +1,25 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +public interface PoolableResource {
    +
    +    public void close();
    +
    +    public boolean isClosed();
    --- End diff --
    
    Javadoc


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174879028
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml ---
    @@ -0,0 +1,78 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.6.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-record-serialization-service-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-record</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    +        </dependency>
    +         <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-ssl-context-service-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    +            <scope>provided</scope>
    +	    </dependency>
    +        <dependency>
    +    			<groupId>org.apache.pulsar</groupId>
    --- End diff --
    
    Broken indentation level.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174882522
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    +                    if (value != null && value.length > 0) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, out -> {
    +                            out.write(value);
    +                        });
    +                    }
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                    session.transfer(flowFile, REL_SUCCESS);
    +                    session.commit();
    +                    getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +                context.getProperty(PULSAR_CLIENT_SERVICE)
    +                    .asControllerService(PulsarClientPool.class)
    +                    .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * For now let's assume that this processor will be configured to run for a longer
    +     * duration than 0 milliseconds. So we will be grabbing as many messages off the topic
    +     * as possible and committing them as FlowFiles
    +     */
    +    private void consumeAsync(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +            Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +            completionService.submit(new Callable<Message>() {
    +                @Override
    +                public Message call() throws Exception {
    +                        return consumer.receiveAsync().get();
    +                }
    +              });
    +
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +                msg = consumer.receive();
    +                final byte[] value = msg.getData();
    +
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    +                logger.info("Created {} from {} messages received from Pulsar Server and transferred to 'success'",
    +                            new Object[]{flowFile, 1});
    +
    +                session.commit();
    +
    +                /*
    +                 * This Processor acknowledges receipt of the data and/or removes the data
    +                 * from the external source in order to prevent receipt of duplicate files.
    +                 * This is done only after the ProcessSession by which the FlowFile was created
    +                 * has been committed! Failure to adhere to this principle may result in data
    +                 * loss, as restarting NiFi before the session has been committed will result
    +                 * in the temporary file being deleted. Note, however, that it is possible using
    +                 * this approach to receive duplicate data because the application could be
    +                 * restarted after committing the session and before acknowledging or removing
    +                 * the data from the external source. In general, though, potential data duplication
    +                 * is preferred over potential data loss.
    +                 */
    +                getLogger().info("Acknowledging message " + msg.getMessageId());
    +                consumer.acknowledge(msg);
    +
    +                } else {
    +                    // We didn't consume any data, so
    +                    session.commit();
    +                }
    +
    +        } catch (PulsarClientException e) {
    +            context.yield();
    +            session.rollback();
    +        }
    +
    +    }
    +
    +    private PulsarConsumer getWrappedConsumer(ProcessContext context) throws PulsarClientException {
    +
    +        if (consumer != null)
    --- End diff --
    
    Curly brackets.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174881541
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    +                    if (value != null && value.length > 0) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, out -> {
    +                            out.write(value);
    +                        });
    +                    }
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    --- End diff --
    
    That should typically be a transit URL like `pulsar://ns/topic`


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174868443
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java ---
    @@ -0,0 +1,24 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +import java.util.Properties;
    +
    +public interface ResourceFactory<R extends PoolableResource> {
    +
    +    public R create(Properties props) throws ResourceCreationException;
    --- End diff --
    
    Javadoc


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174884863
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarProducer;
    +import org.apache.nifi.pulsar.cache.LRUCache;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.CompressionType;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API."
    +    + "The messages to send may be individual FlowFiles or may be delimited, using a "
    +    + "user-specified delimiter, such as a new-line. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    protected static final String MSG_COUNT = "msg.count";
    +
    +    static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression");
    +    static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
    +    static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
    +
    +    static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all "
    +                                                                                                                       + "partitions in a round robin manner");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition");
    +
    +    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be sent asyncronously or not. Messages sent"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Batching Enabled")
    +            .description("Control whether automatic batching of messages is enabled for the producer. "
    +                    + "default: false [No batching] When batching is enabled, multiple calls to "
    +                    + "Producer.sendAsync can result in a single batch to be sent to the broker, leading "
    +                    + "to better throughput, especially when publishing small messages. If compression is "
    +                    + "enabled, messages will be compressed at the batch level, leading to a much better "
    +                    + "compression ratio for similar headers or contents. When enabled default batch delay "
    +                    + "is set to 10 ms and default batch size is 1000 messages")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Batching Max Messages")
    +            .description("Set the maximum number of messages permitted in a batch. default: "
    +                    + "1000 If set to a value greater than 1, messages will be queued until this "
    +                    + "threshold is reached or batch interval has elapsed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Batch Interval")
    +            .description("Set the time period within which the messages sent will be batched default: 10ms "
    +                    + "if batch messages are enabled. If set to a non zero value, messages will be queued until "
    +                    + "this time interval or until the Batching Max Messages threshould has been reached")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +    public static final PropertyDescriptor BLOCK_IF_QUEUE_FULL = new PropertyDescriptor.Builder()
    +            .name("Block if Message Queue Full")
    +            .description("Set whether the processor should block when the outgoing message queue is full. "
    +                    + "Default is false. If set to false, send operations will immediately fail with "
    +                    + "ProducerQueueIsFullError when there is no space left in pending queue.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Compression Type")
    +            .description("Set the compression type for the producer.")
    +            .required(false)
    +            .allowableValues(COMPRESSION_TYPE_NONE, COMPRESSION_TYPE_LZ4, COMPRESSION_TYPE_ZLIB)
    +            .defaultValue(COMPRESSION_TYPE_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor MESSAGE_ROUTING_MODE = new PropertyDescriptor.Builder()
    +            .name("Message Routing Mode")
    +            .description("Set the message routing mode for the producer. This applies only if the destination topic is partitioned")
    +            .required(false)
    +            .allowableValues(MESSAGE_ROUTING_MODE_CUSTOM_PARTITION, MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION, MESSAGE_ROUTING_MODE_SINGLE_PARTITION)
    +            .defaultValue(MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor PENDING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Max Pending Messages")
    +            .description("Set the max size of the queue holding the messages pending to receive an "
    +                    + "acknowledgment from the broker.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    private LRUCache<String, PulsarProducer> producers;
    +    private ProducerConfiguration producerConfig;
    +
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnStopped
    +    public void cleanUp(final ProcessContext context) {
    +       // Close all of the producers and invalidate them, so they get removed from the Resource Pool
    +       getProducerCache(context).clear();
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null)
    +            return;
    +
    +        final ComponentLog logger = getLogger();
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            logger.error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    --- End diff --
    
    Alternatively, you can use `session.exportTo` to stream the contents into `ByteArrayOutputStream`. Your call.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174872527
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context
    +     *            the configuration context
    +     * @throws InitializationException
    +     *             if unable to create a database connection
    +     */
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +
    +            createClient(context);
    +
    +            if (this.client == null)
    --- End diff --
    
    Please add curly brackets.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    @MikeThomsen   Is there another car bundle that uses a Docker image for integration testing that I can use as an example?   
    
    I will clean up the formatting / style errors.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174872872
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context
    +     *            the configuration context
    +     * @throws InitializationException
    +     *             if unable to create a database connection
    +     */
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +
    +            createClient(context);
    +
    +            if (this.client == null)
    +                throw new InitializationException("Unable to create Pulsar Client");
    +
    +            producers = new ResourcePoolImpl<PulsarProducer>(new PulsarProducerFactory(client), context.getProperty(MAX_PRODUCERS).asInteger());
    +            consumers = new ResourcePoolImpl<PulsarConsumer>(new PulsarConsumerFactory(client), context.getProperty(MAX_CONSUMERS).asInteger());
    +
    +    }
    +
    +    private void createClient(final ConfigurationContext context) throws InitializationException {
    +
    +            // We can't create a client without a service URL.
    +            if (!context.getProperty(PULSAR_SERVICE_URL).isSet()) {
    --- End diff --
    
    I don't think this is necessary because you made the service URL property required.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174866298
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java ---
    @@ -0,0 +1,25 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +public interface PoolableResource {
    +
    +    public void close();
    --- End diff --
    
    Javadoc.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174874448
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +import java.util.Iterator;
    +import java.util.Properties;
    +import java.util.Vector;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +
    +public class ResourcePoolImpl<R extends PoolableResource> implements ResourcePool<R> {
    +
    +    private final Lock lock = new ReentrantLock();
    +    private final Condition poolAvailable = lock.newCondition();
    +    private int max_resources;
    +    private final Vector<R> pool;
    +
    +    private final ResourceExceptionHandler resourceExceptionHandler;
    +    private final ResourceFactory<R> factory;
    +
    +    public ResourcePoolImpl(ResourceFactory<R> factory, int max_resources) {
    +            this(factory, new ResourceExceptionHandlerImpl(), max_resources);
    +    }
    +
    +    public ResourcePoolImpl(ResourceFactory<R> factory, ResourceExceptionHandler handler, int max_resources) {
    +        lock.lock();
    +        try {
    +            this.factory = factory;
    +            this.resourceExceptionHandler = handler;
    +            this.max_resources = max_resources;
    +            this.pool = new Vector<R>(max_resources);
    +        } finally {
    +            lock.unlock();
    +        }
    +    }
    +
    +    private R createResource(Properties props) {
    +        R resource = null;
    +        try {
    +
    +            resource = factory.create(props);
    --- End diff --
    
    There's some extraneous white space around this.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    Thanks for the update.....I have merged my changes that came from your code review into the NIFI-4914-rebase PR as well.  That PR has extends the base Pulsar processor capabilities to include RecordBased processing as well 


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    @david-streamlio thanks for the link.
    
    > Is there another car bundle that uses a Docker image for integration testing that I can use as an example?
    
    The Mongo package, sorta. We wrote all of the tests to assume Mongo defaults, so if you set up a simple Docker install of Mongo, all of the defaults will just click between the image and the int tests.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio closed the pull request at:

    https://github.com/apache/nifi/pull/2553


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174879478
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    --- End diff --
    
    You should consider changing this to `ConsumePulsar_1_X` to warn users that you may be moving the internal client compatibility forward if let's say 1.4 breaks compatibility with the current 1.2 branch in the incubator.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174878122
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +import java.util.Iterator;
    +import java.util.Properties;
    +import java.util.Vector;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +
    +public class ResourcePoolImpl<R extends PoolableResource> implements ResourcePool<R> {
    +
    +    private final Lock lock = new ReentrantLock();
    +    private final Condition poolAvailable = lock.newCondition();
    +    private int max_resources;
    +    private final Vector<R> pool;
    +
    +    private final ResourceExceptionHandler resourceExceptionHandler;
    +    private final ResourceFactory<R> factory;
    +
    +    public ResourcePoolImpl(ResourceFactory<R> factory, int max_resources) {
    +            this(factory, new ResourceExceptionHandlerImpl(), max_resources);
    +    }
    +
    +    public ResourcePoolImpl(ResourceFactory<R> factory, ResourceExceptionHandler handler, int max_resources) {
    +        lock.lock();
    +        try {
    +            this.factory = factory;
    +            this.resourceExceptionHandler = handler;
    +            this.max_resources = max_resources;
    +            this.pool = new Vector<R>(max_resources);
    +        } finally {
    +            lock.unlock();
    +        }
    +    }
    +
    +    private R createResource(Properties props) {
    +        R resource = null;
    +        try {
    +
    +            resource = factory.create(props);
    +
    +            if (resource == null)
    +                throw new ResourceCreationException("Unable to create resource");
    +
    +        } catch (Exception e) {
    +            resourceExceptionHandler.handle(e);
    +        }
    +        return resource;
    +    }
    +
    +
    +    /*
    +     * Shutdown the pool and release the resources
    +     */
    +    public void close() {
    +
    +        Iterator<R> itr = pool.iterator();
    +        while (itr.hasNext()) {
    +            itr.next().close();
    +        }
    +
    +    }
    +
    +    public boolean isEmpty() {
    +            return (pool.isEmpty());
    +    }
    +
    +    public boolean isFull() {
    +        return (pool != null && pool.size() == max_resources);
    +    }
    +
    +    @Override
    +    public R acquire(Properties props) throws InterruptedException {
    +        lock.lock();
    +        try {
    +            while (max_resources <= 0) {
    +                poolAvailable.await();
    +            }
    +
    +            --max_resources;
    +
    +            if (pool != null) {
    +                int size = pool.size();
    +                if (size > 0)
    +                    return pool.remove(size - 1);
    +            }
    +            return createResource(props);
    +        } finally {
    +            lock.unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void evict(R resource) {
    +        lock.lock();
    +        try {
    +
    +            // Attempt to close the connection
    +            if (!resource.isClosed())
    --- End diff --
    
    Curly brackets.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174882988
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    +                    if (value != null && value.length > 0) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, out -> {
    +                            out.write(value);
    +                        });
    +                    }
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                    session.transfer(flowFile, REL_SUCCESS);
    +                    session.commit();
    +                    getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +                context.getProperty(PULSAR_CLIENT_SERVICE)
    +                    .asControllerService(PulsarClientPool.class)
    +                    .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * For now let's assume that this processor will be configured to run for a longer
    +     * duration than 0 milliseconds. So we will be grabbing as many messages off the topic
    +     * as possible and committing them as FlowFiles
    +     */
    +    private void consumeAsync(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +            Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +            completionService.submit(new Callable<Message>() {
    +                @Override
    +                public Message call() throws Exception {
    +                        return consumer.receiveAsync().get();
    +                }
    +              });
    +
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +                msg = consumer.receive();
    +                final byte[] value = msg.getData();
    +
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    +                logger.info("Created {} from {} messages received from Pulsar Server and transferred to 'success'",
    +                            new Object[]{flowFile, 1});
    +
    +                session.commit();
    +
    +                /*
    +                 * This Processor acknowledges receipt of the data and/or removes the data
    +                 * from the external source in order to prevent receipt of duplicate files.
    +                 * This is done only after the ProcessSession by which the FlowFile was created
    +                 * has been committed! Failure to adhere to this principle may result in data
    +                 * loss, as restarting NiFi before the session has been committed will result
    +                 * in the temporary file being deleted. Note, however, that it is possible using
    +                 * this approach to receive duplicate data because the application could be
    +                 * restarted after committing the session and before acknowledging or removing
    +                 * the data from the external source. In general, though, potential data duplication
    +                 * is preferred over potential data loss.
    +                 */
    +                getLogger().info("Acknowledging message " + msg.getMessageId());
    +                consumer.acknowledge(msg);
    +
    +                } else {
    +                    // We didn't consume any data, so
    +                    session.commit();
    +                }
    +
    +        } catch (PulsarClientException e) {
    +            context.yield();
    +            session.rollback();
    +        }
    +
    +    }
    +
    +    private PulsarConsumer getWrappedConsumer(ProcessContext context) throws PulsarClientException {
    +
    +        if (consumer != null)
    +            return consumer;
    +
    +        final PulsarClientPool pulsarClientService = context.getProperty(PULSAR_CLIENT_SERVICE)
    +                .asControllerService(PulsarClientPool.class);
    +
    +        try {
    +            consumer = pulsarClientService.getConsumerPool()
    --- End diff --
    
    Same issue here. I think you shouldn't be exposing any of the internals from the controller service. Bear in mind too that you can access controller services from scripts, so you might want to think defensively here so that you can reduce the opportunities for users to later do stupid things with your service.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174881132
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    --- End diff --
    
    Whole indentation level looks off here.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174884100
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarProducer;
    +import org.apache.nifi.pulsar.cache.LRUCache;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.CompressionType;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API."
    +    + "The messages to send may be individual FlowFiles or may be delimited, using a "
    +    + "user-specified delimiter, such as a new-line. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    protected static final String MSG_COUNT = "msg.count";
    +
    +    static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression");
    +    static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
    +    static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
    +
    +    static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all "
    +                                                                                                                       + "partitions in a round robin manner");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition");
    +
    +    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be sent asyncronously or not. Messages sent"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Batching Enabled")
    +            .description("Control whether automatic batching of messages is enabled for the producer. "
    +                    + "default: false [No batching] When batching is enabled, multiple calls to "
    +                    + "Producer.sendAsync can result in a single batch to be sent to the broker, leading "
    +                    + "to better throughput, especially when publishing small messages. If compression is "
    +                    + "enabled, messages will be compressed at the batch level, leading to a much better "
    +                    + "compression ratio for similar headers or contents. When enabled default batch delay "
    +                    + "is set to 10 ms and default batch size is 1000 messages")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Batching Max Messages")
    +            .description("Set the maximum number of messages permitted in a batch. default: "
    +                    + "1000 If set to a value greater than 1, messages will be queued until this "
    +                    + "threshold is reached or batch interval has elapsed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Batch Interval")
    +            .description("Set the time period within which the messages sent will be batched default: 10ms "
    +                    + "if batch messages are enabled. If set to a non zero value, messages will be queued until "
    +                    + "this time interval or until the Batching Max Messages threshould has been reached")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +    public static final PropertyDescriptor BLOCK_IF_QUEUE_FULL = new PropertyDescriptor.Builder()
    +            .name("Block if Message Queue Full")
    +            .description("Set whether the processor should block when the outgoing message queue is full. "
    +                    + "Default is false. If set to false, send operations will immediately fail with "
    +                    + "ProducerQueueIsFullError when there is no space left in pending queue.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Compression Type")
    +            .description("Set the compression type for the producer.")
    +            .required(false)
    +            .allowableValues(COMPRESSION_TYPE_NONE, COMPRESSION_TYPE_LZ4, COMPRESSION_TYPE_ZLIB)
    +            .defaultValue(COMPRESSION_TYPE_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor MESSAGE_ROUTING_MODE = new PropertyDescriptor.Builder()
    +            .name("Message Routing Mode")
    +            .description("Set the message routing mode for the producer. This applies only if the destination topic is partitioned")
    +            .required(false)
    +            .allowableValues(MESSAGE_ROUTING_MODE_CUSTOM_PARTITION, MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION, MESSAGE_ROUTING_MODE_SINGLE_PARTITION)
    +            .defaultValue(MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor PENDING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Max Pending Messages")
    +            .description("Set the max size of the queue holding the messages pending to receive an "
    +                    + "acknowledgment from the broker.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    private LRUCache<String, PulsarProducer> producers;
    +    private ProducerConfiguration producerConfig;
    +
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnStopped
    +    public void cleanUp(final ProcessContext context) {
    +       // Close all of the producers and invalidate them, so they get removed from the Resource Pool
    +       getProducerCache(context).clear();
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null)
    --- End diff --
    
    Curly brackets.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174874403
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +import java.util.Iterator;
    +import java.util.Properties;
    +import java.util.Vector;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +
    +public class ResourcePoolImpl<R extends PoolableResource> implements ResourcePool<R> {
    +
    +    private final Lock lock = new ReentrantLock();
    +    private final Condition poolAvailable = lock.newCondition();
    +    private int max_resources;
    +    private final Vector<R> pool;
    +
    +    private final ResourceExceptionHandler resourceExceptionHandler;
    +    private final ResourceFactory<R> factory;
    +
    +    public ResourcePoolImpl(ResourceFactory<R> factory, int max_resources) {
    +            this(factory, new ResourceExceptionHandlerImpl(), max_resources);
    +    }
    +
    +    public ResourcePoolImpl(ResourceFactory<R> factory, ResourceExceptionHandler handler, int max_resources) {
    +        lock.lock();
    +        try {
    +            this.factory = factory;
    +            this.resourceExceptionHandler = handler;
    +            this.max_resources = max_resources;
    +            this.pool = new Vector<R>(max_resources);
    +        } finally {
    +            lock.unlock();
    +        }
    +    }
    +
    +    private R createResource(Properties props) {
    +        R resource = null;
    +        try {
    +
    +            resource = factory.create(props);
    +
    +            if (resource == null)
    --- End diff --
    
    Curly brackets.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174884957
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarProducer;
    +import org.apache.nifi.pulsar.cache.LRUCache;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.CompressionType;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API."
    +    + "The messages to send may be individual FlowFiles or may be delimited, using a "
    +    + "user-specified delimiter, such as a new-line. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    protected static final String MSG_COUNT = "msg.count";
    +
    +    static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression");
    +    static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
    +    static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
    +
    +    static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all "
    +                                                                                                                       + "partitions in a round robin manner");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition");
    +
    +    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be sent asyncronously or not. Messages sent"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Batching Enabled")
    +            .description("Control whether automatic batching of messages is enabled for the producer. "
    +                    + "default: false [No batching] When batching is enabled, multiple calls to "
    +                    + "Producer.sendAsync can result in a single batch to be sent to the broker, leading "
    +                    + "to better throughput, especially when publishing small messages. If compression is "
    +                    + "enabled, messages will be compressed at the batch level, leading to a much better "
    +                    + "compression ratio for similar headers or contents. When enabled default batch delay "
    +                    + "is set to 10 ms and default batch size is 1000 messages")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Batching Max Messages")
    +            .description("Set the maximum number of messages permitted in a batch. default: "
    +                    + "1000 If set to a value greater than 1, messages will be queued until this "
    +                    + "threshold is reached or batch interval has elapsed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Batch Interval")
    +            .description("Set the time period within which the messages sent will be batched default: 10ms "
    +                    + "if batch messages are enabled. If set to a non zero value, messages will be queued until "
    +                    + "this time interval or until the Batching Max Messages threshould has been reached")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +    public static final PropertyDescriptor BLOCK_IF_QUEUE_FULL = new PropertyDescriptor.Builder()
    +            .name("Block if Message Queue Full")
    +            .description("Set whether the processor should block when the outgoing message queue is full. "
    +                    + "Default is false. If set to false, send operations will immediately fail with "
    +                    + "ProducerQueueIsFullError when there is no space left in pending queue.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Compression Type")
    +            .description("Set the compression type for the producer.")
    +            .required(false)
    +            .allowableValues(COMPRESSION_TYPE_NONE, COMPRESSION_TYPE_LZ4, COMPRESSION_TYPE_ZLIB)
    +            .defaultValue(COMPRESSION_TYPE_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor MESSAGE_ROUTING_MODE = new PropertyDescriptor.Builder()
    +            .name("Message Routing Mode")
    +            .description("Set the message routing mode for the producer. This applies only if the destination topic is partitioned")
    +            .required(false)
    +            .allowableValues(MESSAGE_ROUTING_MODE_CUSTOM_PARTITION, MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION, MESSAGE_ROUTING_MODE_SINGLE_PARTITION)
    +            .defaultValue(MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor PENDING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Max Pending Messages")
    +            .description("Set the max size of the queue holding the messages pending to receive an "
    +                    + "acknowledgment from the broker.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    private LRUCache<String, PulsarProducer> producers;
    +    private ProducerConfiguration producerConfig;
    +
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnStopped
    +    public void cleanUp(final ProcessContext context) {
    +       // Close all of the producers and invalidate them, so they get removed from the Resource Pool
    +       getProducerCache(context).clear();
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null)
    +            return;
    +
    +        final ComponentLog logger = getLogger();
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            logger.error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    +            return;
    +        }
    +
    +        try {
    +
    +            Producer producer = getWrappedProducer(topic, context).getProducer();
    +
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    --- End diff --
    
    isSet() is not required here if it's a required field.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174866791
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceExceptionHandler.java ---
    @@ -0,0 +1,23 @@
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +public interface ResourceExceptionHandler {
    +
    +    void handle(Exception exc);
    --- End diff --
    
    Javadoc


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174865149
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml ---
    @@ -0,0 +1,40 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.6.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-client-service-api</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +    			<groupId>org.apache.pulsar</groupId>
    --- End diff --
    
    Nit: indent level is broken here.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174865373
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.controller.ControllerService;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +
    +
    +@Tags({"Pulsar"})
    +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration."
    +                     + "properties defined")
    +public interface PulsarClientPool extends ControllerService {
    +
    +    public ResourcePool<PulsarProducer> getProducerPool();
    +
    +    public ResourcePool<PulsarConsumer> getConsumerPool();
    --- End diff --
    
    Same here.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174865317
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.controller.ControllerService;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +
    +
    +@Tags({"Pulsar"})
    +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration."
    +                     + "properties defined")
    +public interface PulsarClientPool extends ControllerService {
    +
    +    public ResourcePool<PulsarProducer> getProducerPool();
    --- End diff --
    
    There should be a basic javadoc here.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    @david-streamlio I'm going to try to get back to this very soon.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    You should strongly consider setting up some integration tests that can be run against a simple Docker image. All you have to do with NiFi to get that done is add a few classes with "IT" at the end of their name and you can run them with `mvn integration-test -Pintegration-tests`.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174874129
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context
    +     *            the configuration context
    +     * @throws InitializationException
    +     *             if unable to create a database connection
    +     */
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +
    +            createClient(context);
    +
    +            if (this.client == null)
    +                throw new InitializationException("Unable to create Pulsar Client");
    +
    +            producers = new ResourcePoolImpl<PulsarProducer>(new PulsarProducerFactory(client), context.getProperty(MAX_PRODUCERS).asInteger());
    +            consumers = new ResourcePoolImpl<PulsarConsumer>(new PulsarConsumerFactory(client), context.getProperty(MAX_CONSUMERS).asInteger());
    +
    +    }
    +
    +    private void createClient(final ConfigurationContext context) throws InitializationException {
    +
    +            // We can't create a client without a service URL.
    +            if (!context.getProperty(PULSAR_SERVICE_URL).isSet()) {
    +                return;
    +            }
    +
    +            try {
    +            this.client = PulsarClient.create(buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).getValue(),
    +                        getClientConfig(context).isUseTls()), getClientConfig(context));
    +
    +        } catch (Exception e) {
    +            throw new InitializationException("Unable to create Pulsar Client", e);
    +        }
    +
    +    }
    +
    +    private static String buildPulsarBrokerRootUrl(String uri, boolean tlsEnabled) {
    +        StringBuilder builder = new StringBuilder();
    +        builder.append("pulsar");
    +
    +        if (tlsEnabled)
    +                builder.append("+ssl");
    +
    +        builder.append("://");
    +        builder.append(uri);
    +        return builder.toString();
    +    }
    +
    +    private ClientConfiguration getClientConfig(ConfigurationContext context) throws UnsupportedAuthenticationException {
    +
    +        if (clientConfig == null) {
    +            clientConfig = new ClientConfiguration();
    +
    +            if (context.getProperty(CONCURRENT_LOOKUP_REQUESTS).isSet()) {
    --- End diff --
    
    I noticed above that most of these were set to be optional. Are you trying to let the API impose its own defaults if the user removes the default value you specified in each of these optionals? If that's not your intent, I would make them all required since you already provide a default value for each of them. Then you can get rid of all of these if statements.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    Yeah. Since this is your first time, I'd recommend backing up the repo before trying it. Once you've gotten them down to one commit, it won't need to be done again until the review process is done.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174878824
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class TestStandardPulsarClientService {
    +
    +    @Before
    +    public void init() {
    +
    +    }
    +
    +    @Test
    +    public void testService() throws InitializationException {
    +        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
    +        final PulsarClientPool service = new StandardPulsarClientPool();
    +        runner.addControllerService("test-good", service);
    +
    +        runner.setProperty(service, StandardPulsarClientPool.PULSAR_SERVICE_URL, "localhost:6667");
    +        // runner.enableControllerService(service);
    --- End diff --
    
    I think you might actually needs this. If that's not the case, it should be removed.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174898281
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context
    +     *            the configuration context
    +     * @throws InitializationException
    +     *             if unable to create a database connection
    +     */
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +
    +            createClient(context);
    +
    +            if (this.client == null)
    +                throw new InitializationException("Unable to create Pulsar Client");
    +
    +            producers = new ResourcePoolImpl<PulsarProducer>(new PulsarProducerFactory(client), context.getProperty(MAX_PRODUCERS).asInteger());
    +            consumers = new ResourcePoolImpl<PulsarConsumer>(new PulsarConsumerFactory(client), context.getProperty(MAX_CONSUMERS).asInteger());
    +
    +    }
    +
    +    private void createClient(final ConfigurationContext context) throws InitializationException {
    +
    +            // We can't create a client without a service URL.
    +            if (!context.getProperty(PULSAR_SERVICE_URL).isSet()) {
    +                return;
    +            }
    +
    +            try {
    +            this.client = PulsarClient.create(buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).getValue(),
    +                        getClientConfig(context).isUseTls()), getClientConfig(context));
    +
    +        } catch (Exception e) {
    +            throw new InitializationException("Unable to create Pulsar Client", e);
    +        }
    +
    +    }
    +
    +    private static String buildPulsarBrokerRootUrl(String uri, boolean tlsEnabled) {
    +        StringBuilder builder = new StringBuilder();
    +        builder.append("pulsar");
    +
    +        if (tlsEnabled)
    +                builder.append("+ssl");
    +
    +        builder.append("://");
    +        builder.append(uri);
    +        return builder.toString();
    +    }
    +
    +    private ClientConfiguration getClientConfig(ConfigurationContext context) throws UnsupportedAuthenticationException {
    +
    +        if (clientConfig == null) {
    +            clientConfig = new ClientConfiguration();
    +
    +            if (context.getProperty(CONCURRENT_LOOKUP_REQUESTS).isSet()) {
    --- End diff --
    
    It is my intent to use the default values for org.apache.pulsar.client.api.ClientConfiguration, if and only if the user didn't explicitly override them via one of these properties.  The default values I provide for each of these is directly from the API default values....


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174883410
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    +                    if (value != null && value.length > 0) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, out -> {
    +                            out.write(value);
    +                        });
    +                    }
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                    session.transfer(flowFile, REL_SUCCESS);
    +                    session.commit();
    +                    getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +                context.getProperty(PULSAR_CLIENT_SERVICE)
    +                    .asControllerService(PulsarClientPool.class)
    +                    .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * For now let's assume that this processor will be configured to run for a longer
    +     * duration than 0 milliseconds. So we will be grabbing as many messages off the topic
    +     * as possible and committing them as FlowFiles
    +     */
    +    private void consumeAsync(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +            Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +            completionService.submit(new Callable<Message>() {
    +                @Override
    +                public Message call() throws Exception {
    +                        return consumer.receiveAsync().get();
    +                }
    +              });
    +
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +                msg = consumer.receive();
    +                final byte[] value = msg.getData();
    +
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    +                logger.info("Created {} from {} messages received from Pulsar Server and transferred to 'success'",
    +                            new Object[]{flowFile, 1});
    +
    +                session.commit();
    +
    +                /*
    +                 * This Processor acknowledges receipt of the data and/or removes the data
    +                 * from the external source in order to prevent receipt of duplicate files.
    +                 * This is done only after the ProcessSession by which the FlowFile was created
    +                 * has been committed! Failure to adhere to this principle may result in data
    +                 * loss, as restarting NiFi before the session has been committed will result
    +                 * in the temporary file being deleted. Note, however, that it is possible using
    +                 * this approach to receive duplicate data because the application could be
    +                 * restarted after committing the session and before acknowledging or removing
    +                 * the data from the external source. In general, though, potential data duplication
    +                 * is preferred over potential data loss.
    +                 */
    +                getLogger().info("Acknowledging message " + msg.getMessageId());
    +                consumer.acknowledge(msg);
    +
    +                } else {
    +                    // We didn't consume any data, so
    +                    session.commit();
    +                }
    +
    +        } catch (PulsarClientException e) {
    +            context.yield();
    +            session.rollback();
    +        }
    +
    +    }
    +
    +    private PulsarConsumer getWrappedConsumer(ProcessContext context) throws PulsarClientException {
    +
    +        if (consumer != null)
    +            return consumer;
    +
    +        final PulsarClientPool pulsarClientService = context.getProperty(PULSAR_CLIENT_SERVICE)
    +                .asControllerService(PulsarClientPool.class);
    +
    +        try {
    +            consumer = pulsarClientService.getConsumerPool()
    +                    .acquire(getConsumerProperties(context));
    +
    +            if (consumer == null || consumer.getConsumer() == null) {
    +                throw new PulsarClientException("Unable to create Pulsar Consumer");
    +            }
    +
    +            return consumer;
    +        } catch (final InterruptedException ex) {
    +            return null;
    +        }
    +    }
    +
    +    private Properties getConsumerProperties(ProcessContext context) {
    +
    +        Properties props = new Properties();
    +        props.put(PulsarConsumerFactory.TOPIC_NAME, context.getProperty(TOPIC).getValue());
    +        props.put(PulsarConsumerFactory.SUBSCRIPTION_NAME, context.getProperty(SUBSCRIPTION).getValue());
    +        props.put(PulsarConsumerFactory.CONSUMER_CONFIG, getConsumerConfig(context));
    +        return props;
    +    }
    +
    +    private ConsumerConfiguration getConsumerConfig(ProcessContext context) {
    +
    +        if (consumerConfig == null) {
    +            consumerConfig = new ConsumerConfiguration();
    +
    +            if (context.getProperty(ACK_TIMEOUT).isSet())
    --- End diff --
    
    See my earlier comment about required fields and making this sort of thing simpler.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    Alright, sounds good.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174879831
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    --- End diff --
    
    Also note that as it is an incubator project and someone really doesn't want to track 1.X going forward AND complains that you broke compatibility for them by staying up to date with 1.X, that's on them. Incubator projects are by definition moving targets and should be handled that way during risk assessment by teams using them.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174873036
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context
    +     *            the configuration context
    +     * @throws InitializationException
    +     *             if unable to create a database connection
    +     */
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +
    +            createClient(context);
    +
    +            if (this.client == null)
    +                throw new InitializationException("Unable to create Pulsar Client");
    +
    +            producers = new ResourcePoolImpl<PulsarProducer>(new PulsarProducerFactory(client), context.getProperty(MAX_PRODUCERS).asInteger());
    +            consumers = new ResourcePoolImpl<PulsarConsumer>(new PulsarConsumerFactory(client), context.getProperty(MAX_CONSUMERS).asInteger());
    +
    +    }
    +
    +    private void createClient(final ConfigurationContext context) throws InitializationException {
    +
    +            // We can't create a client without a service URL.
    +            if (!context.getProperty(PULSAR_SERVICE_URL).isSet()) {
    +                return;
    +            }
    +
    +            try {
    +            this.client = PulsarClient.create(buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).getValue(),
    +                        getClientConfig(context).isUseTls()), getClientConfig(context));
    +
    +        } catch (Exception e) {
    +            throw new InitializationException("Unable to create Pulsar Client", e);
    +        }
    +
    +    }
    +
    +    private static String buildPulsarBrokerRootUrl(String uri, boolean tlsEnabled) {
    +        StringBuilder builder = new StringBuilder();
    +        builder.append("pulsar");
    +
    +        if (tlsEnabled)
    --- End diff --
    
    Please add curly brackets.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    The rule of thumb there is to be patient and do it multiple steps with as many commits as you have in these two branches. I'd recommend only squashing a few commits together a time until 4914 is one pristine commit ready for review.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    Read [this blog post](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html). It goes into good detail about squashing commits during a rebase, along with the caveats in doing so. Basically what you do is follow those directions (pretty simple, but worth it for the background) and then do `git push origin BRANCH_HOME --force` to override the existing remote version of 4914 once you have rebased and squashed.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174878320
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class TestStandardPulsarClientService {
    +
    +    @Before
    +    public void init() {
    --- End diff --
    
    Looks like you can delete this.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174884531
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarProducer;
    +import org.apache.nifi.pulsar.cache.LRUCache;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.CompressionType;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API."
    +    + "The messages to send may be individual FlowFiles or may be delimited, using a "
    +    + "user-specified delimiter, such as a new-line. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    protected static final String MSG_COUNT = "msg.count";
    +
    +    static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression");
    +    static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
    +    static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
    +
    +    static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all "
    +                                                                                                                       + "partitions in a round robin manner");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition");
    +
    +    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be sent asyncronously or not. Messages sent"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Batching Enabled")
    +            .description("Control whether automatic batching of messages is enabled for the producer. "
    +                    + "default: false [No batching] When batching is enabled, multiple calls to "
    +                    + "Producer.sendAsync can result in a single batch to be sent to the broker, leading "
    +                    + "to better throughput, especially when publishing small messages. If compression is "
    +                    + "enabled, messages will be compressed at the batch level, leading to a much better "
    +                    + "compression ratio for similar headers or contents. When enabled default batch delay "
    +                    + "is set to 10 ms and default batch size is 1000 messages")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Batching Max Messages")
    +            .description("Set the maximum number of messages permitted in a batch. default: "
    +                    + "1000 If set to a value greater than 1, messages will be queued until this "
    +                    + "threshold is reached or batch interval has elapsed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Batch Interval")
    +            .description("Set the time period within which the messages sent will be batched default: 10ms "
    +                    + "if batch messages are enabled. If set to a non zero value, messages will be queued until "
    +                    + "this time interval or until the Batching Max Messages threshould has been reached")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +    public static final PropertyDescriptor BLOCK_IF_QUEUE_FULL = new PropertyDescriptor.Builder()
    +            .name("Block if Message Queue Full")
    +            .description("Set whether the processor should block when the outgoing message queue is full. "
    +                    + "Default is false. If set to false, send operations will immediately fail with "
    +                    + "ProducerQueueIsFullError when there is no space left in pending queue.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Compression Type")
    +            .description("Set the compression type for the producer.")
    +            .required(false)
    +            .allowableValues(COMPRESSION_TYPE_NONE, COMPRESSION_TYPE_LZ4, COMPRESSION_TYPE_ZLIB)
    +            .defaultValue(COMPRESSION_TYPE_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor MESSAGE_ROUTING_MODE = new PropertyDescriptor.Builder()
    +            .name("Message Routing Mode")
    +            .description("Set the message routing mode for the producer. This applies only if the destination topic is partitioned")
    +            .required(false)
    +            .allowableValues(MESSAGE_ROUTING_MODE_CUSTOM_PARTITION, MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION, MESSAGE_ROUTING_MODE_SINGLE_PARTITION)
    +            .defaultValue(MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor PENDING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Max Pending Messages")
    +            .description("Set the max size of the queue holding the messages pending to receive an "
    +                    + "acknowledgment from the broker.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    private LRUCache<String, PulsarProducer> producers;
    +    private ProducerConfiguration producerConfig;
    +
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnStopped
    +    public void cleanUp(final ProcessContext context) {
    +       // Close all of the producers and invalidate them, so they get removed from the Resource Pool
    +       getProducerCache(context).clear();
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null)
    +            return;
    +
    +        final ComponentLog logger = getLogger();
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            logger.error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    --- End diff --
    
    Turning this into a lambda would save some lines.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    Ok. If you merged 4908 into 4914, go ahead and close out this out. Also, it would be a good idea to rebase 4914 and squash it into one commit starting out for a code review.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174872630
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context
    --- End diff --
    
    These javadoc lines look off.


---

[GitHub] nifi issue #2553: Nifi 4908 rebase

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2553
  
    So the goal is to get my 23 commits squashed down into a single commit?


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174881995
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    +                    if (value != null && value.length > 0) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, out -> {
    +                            out.write(value);
    +                        });
    +                    }
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                    session.transfer(flowFile, REL_SUCCESS);
    +                    session.commit();
    +                    getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +                context.getProperty(PULSAR_CLIENT_SERVICE)
    +                    .asControllerService(PulsarClientPool.class)
    +                    .getConsumerPool().evict(consumer);
    --- End diff --
    
    Maybe I'm just being pedantic, but this feels like it should be better encapsulated.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174882429
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = completionService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                    final byte[] value = msg.getData();
    +                    if (value != null && value.length > 0) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, out -> {
    +                            out.write(value);
    +                        });
    +                    }
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                    session.transfer(flowFile, REL_SUCCESS);
    +                    session.commit();
    +                    getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +                context.getProperty(PULSAR_CLIENT_SERVICE)
    +                    .asControllerService(PulsarClientPool.class)
    +                    .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * For now let's assume that this processor will be configured to run for a longer
    +     * duration than 0 milliseconds. So we will be grabbing as many messages off the topic
    +     * as possible and committing them as FlowFiles
    +     */
    +    private void consumeAsync(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +            Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +            completionService.submit(new Callable<Message>() {
    +                @Override
    +                public Message call() throws Exception {
    +                        return consumer.receiveAsync().get();
    +                }
    +              });
    +
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +                msg = consumer.receive();
    +                final byte[] value = msg.getData();
    +
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                    session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Indentation is off starting here.


---

[GitHub] nifi pull request #2553: Nifi 4908 rebase

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2553#discussion_r174880893
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarConsumer;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerConfiguration;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
    +            + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages");
    +
    +    protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("Subscription")
    +            .displayName("Subscription Name")
    +            .description("The name of the Pulsar subscription to consume from.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked messages, truncated to the "
    +                    + "nearest millisecond. The timeout needs to be greater than 10 seconds.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription consumers to which broker "
    +                    + "gives more priority while dispatching messages. Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many messages can be accumulated "
    +                    + "by the Consumer before the application calls Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    // Reuse the same consumer for a given topic / subscription
    +    private PulsarConsumer consumer;
    +    private ConsumerConfiguration consumerConfig;
    +
    +    // Pool for running multiple consume Async requests
    +    ExecutorService pool;
    +    ExecutorCompletionService<Message> completionService;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +            pool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            completionService = new ExecutorCompletionService<>(pool);
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown() {
    +            // Stop all the async consumers
    +            pool.shutdownNow();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    --- End diff --
    
    You don't have to call `isSet` here because it's a required field.


---