You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by david-streamlio <gi...@git.apache.org> on 2018/04/06 22:46:19 UTC

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

GitHub user david-streamlio opened a pull request:

    https://github.com/apache/nifi/pull/2614

    Added Apache Pulsar Processors and Controller Service

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/david-streamlio/nifi NIFI-4914

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2614
    
----
commit bcfdae70506e4b1f2aab03a8bcfc9f55f859597c
Author: David Kjerrumgaard <da...@...>
Date:   2018-04-06T22:42:30Z

    Added Apache Pulsar Processors and Controller Service

----


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @david-streamlio don't feel rushed, I was just checking in to see if you were still tracking this.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    My latest commit passed all the tests, and is showing no conflicts with the base branch. But you are seeing "an explosion of conflicts".  So I am confused....What steps do you need me to perform in order to get this PR in an acceptable state?


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Probably needed a git rebase --continue instead...


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235033
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +
    +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
    +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. "
    +    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class})
    +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            getLogger().error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    +            return;
    --- End diff --
    
    I would put `msg.count = 0` on this before sending it out so that people checking through the provenance history can find that this flowfile had no records in it.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234539
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.function.BiConsumer;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +
    +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. "
    +        + "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, "
    +        + "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any "
    +        + "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or "
    +        + "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
    +        + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual "
    +        + "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the "
    +        + "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.")
    +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The number of records received")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, PublishPulsarRecord_1_X.class})
    +public class ConsumePulsarRecord_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    public static final String MSG_COUNT = "record.count";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The number of records to combine into a single flow file.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
    +            .name("parse_failure")
    +            .description("FlowFiles for which the content was not prasable")
    +            .build();
    +
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(BATCH_SIZE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_PARSE_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                .asControllerService(RecordSetWriterFactory.class);
    +
    +        List<Message> messages = null;
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                messages = handleAsync(context, session);
    +
    +            } else {
    +               messages = consume(context, session);
    +            }
    +
    +            processMessages(context, session, messages, readerFactory, writerFactory, context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean());
    +
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    /**
    +     * Pull messages off of the topic until we have reached BATCH_SIZE or BATCH_DURATION
    +     * whichever occurs first.
    +     */
    +    private List<Message> consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        List<Message> messages = new ArrayList<Message>();
    +
    +        long startTime = System.currentTimeMillis();
    +
    +        while ( (messages.size() < context.getProperty(BATCH_SIZE).asInteger())
    +                && (queryTimeout == 0 || System.currentTimeMillis() - startTime < queryTimeout ) ) {
    +            messages.add(consumer.receive());
    +        }
    +
    +        return messages;
    +    }
    +
    +    private void processMessages(ProcessContext context, ProcessSession session, List<Message> messages, RecordReaderFactory readerFactory,
    +        RecordSetWriterFactory writerFactory, boolean async) throws PulsarClientException {
    +
    +        if (messages.isEmpty())
    +           return;
    +
    +        final AtomicLong messagesReceived = new AtomicLong(0L);
    +
    +        final BiConsumer<Message, Exception> handleParseFailure = (msg, e) -> {
    +            FlowFile failureFlowFile = session.create();
    +            if (msg.getData() != null) {
    +               failureFlowFile = session.write(failureFlowFile, out -> out.write(msg.getData()));
    +            }
    +            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
    +        };
    +
    +        RecordSetWriter writer = null;
    +        FlowFile flowFile = null;
    +        OutputStream rawOut = null;
    +
    +        for (Message msg: messages)  {
    +            RecordReader reader = getRecordReader(msg, readerFactory, handleParseFailure);
    +            Record firstRecord = getFirstRecord(msg, reader, handleParseFailure);
    +
    +            if (firstRecord == null) {
    +                // If the message doesn't contain any record, just ack the message
    +                if (async) {
    +                   ackService.submit(new Callable<Void>() {
    +                      @Override
    +                      public Void call() throws Exception {
    +                         return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                      }
    +                   });
    +                } else {
    +                   consumer.getConsumer().acknowledge(msg);
    +                }
    +
    +                continue;
    +            }
    +
    +            // Session / FlowFile starts here
    +            if (flowFile == null) {
    +                flowFile = session.create();
    +                rawOut = session.write(flowFile);
    +            }
    +
    +            // Create the Record Writer
    +            if (writer == null) {
    +                try {
    +                    writer = getRecordWriter(writerFactory, firstRecord.getSchema(), rawOut);
    +                    writer.beginRecordSet();
    +                } catch (SchemaNotFoundException | IOException ex) {
    +                    getLogger().error("Failed to obtain Schema for FlowFile.", ex);
    +                    throw new ProcessException(ex);
    +                }
    +            }
    +
    +            // Read all the records from this message, as it may contain several
    +            try {
    +
    +                for (Record record = firstRecord; record != null; record = reader.nextRecord()) {
    +                    writer.write(record);
    +                    messagesReceived.incrementAndGet();
    +                 }
    +
    +                } catch (MalformedRecordException | IOException mEx) {
    +                   handleParseFailure.accept(msg, mEx);
    +                }
    +
    +            // Acknowledge the message
    +            if (async) {
    +               ackService.submit(new Callable<Void>() {
    +                   @Override
    +                   public Void call() throws Exception {
    +                      return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                   }
    +               });
    +            } else {
    +                  consumer.getConsumer().acknowledge(msg);
    +            }
    +        }
    +
    +        // Clean-up and transfer session
    +        try {
    +            if (writer != null)
    +               writer.finishRecordSet();
    +
    +            if (rawOut != null)
    --- End diff --
    
    Needs curly brackets.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183209058
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    --- End diff --
    
    Consider adding `expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)`.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @david-streamlio I'm in the processor of working through a code review now. In the mean time, I found that you did not add the NAR reference to nifi-assembly/pom.xml. Can you correct that?


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    > Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.20.1:test (default-test) on project nifi-pulsar-processors: There are test failures.
    
    Related to how you set up the expression language support. The boolean version of the method call is deprecated and replaced with one that uses an enum in 1.7.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234849
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = consumerService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                final byte[] value = msg.getData();
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                   session.getProvenanceReporter().receive(flowFile, "From " + getWrappedConsumer(context).getTransitURL());
    +                   session.transfer(flowFile, REL_SUCCESS);
    +                   session.commit();
    +                   getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +                }
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +            context.getProperty(PULSAR_CLIENT_SERVICE)
    +                .asControllerService(PulsarClientPool.class)
    +                .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +            msg = consumer.receive();
    +            final byte[] value = msg.getData();
    +
    +            if (value != null && value.length > 0) {
    +                flowFile = session.create();
    +                flowFile = session.write(flowFile, out -> {
    +                    out.write(value);
    +                });
    +
    +                session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    +                logger.info("Created {} from {} messages received from Pulsar Server and transferred to 'success'",
    +                        new Object[]{flowFile, 1});
    +
    +                session.commit();
    +
    +                /*
    +                 * This Processor acknowledges receipt of the data and/or removes the data
    +                 * from the external source in order to prevent receipt of duplicate files.
    +                 * This is done only after the ProcessSession by which the FlowFile was created
    +                 * has been committed! Failure to adhere to this principle may result in data
    +                 * loss, as restarting NiFi before the session has been committed will result
    +                 * in the temporary file being deleted. Note, however, that it is possible using
    +                 * this approach to receive duplicate data because the application could be
    +                 * restarted after committing the session and before acknowledging or removing
    +                 * the data from the external source. In general, though, potential data duplication
    +                 * is preferred over potential data loss.
    +                 */
    +                getLogger().info("Acknowledging message " + msg.getMessageId());
    +                consumer.acknowledge(msg);
    +
    +            } else {
    +                // We didn't consume any data, so
    +                session.commit();
    --- End diff --
    
    You don't need to worry about this, AbstractProcessor will do it for you.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235163
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.X Producer API."
    +    + "The messages to send may be individual FlowFiles or may be delimited, using a "
    +    + "user-specified delimiter, such as a new-line. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsar_1_X.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +public class PublishPulsar_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog logger = getLogger();
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            logger.error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, baos);
    +
    +        final byte[] messageContent = baos.toByteArray();
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    I think you should have `msg.count = 0` here.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183209071
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    --- End diff --
    
    Are you sure this is a sane default? Your description suggests that 5,000 is the upper end of where most users would want to be.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @david-streamlio Haven't heard from you in almost 3 weeks. Have you had a chance to work on any of the code review changes?


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Also, run `git rebase --continue` on that branch and tell us what happens. Copy and paste the output as "code" in the combox (code Markdown helper is the <> button)


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    `Davids-MacBook-Pro:nifi david$ git rebase 
    First, rewinding head to replay your work on top of it...
    Applying: NIFI-4289 - InfluxDB Put processor
    Applying: NIFI-4827 Added support for reading queries from the flowfile body to GetMongo.
    Applying: NIFI-4910 Fixing slight spelling mistake in error message, needs a space
    Applying: NIFI-3502:
    Applying: NIFI-4876 - Adding Min Object Age to ListS3
    Applying: NIFI-4839 Creating nifi-toolkit-cli to provide a CLI for interacting with NiFi and NiFi Registry
    Applying: NIFI-4839 - Rename the registry group to `registry` for better UX
    Applying: NIFI-4839 - Fixing completer unit test
    Applying: NIFI-4839 - Implemented auto-layout when importing the PG. Will find an available spot on a canvas which doesn't overlap with other components and is as close to the canvas center as possible.
    Applying: NIFI-4839
    Applying: NIFI-4839 - Support both public URLs and local files as inputs for import actions.
    Applying: NIFI-4839 - Updating README and cleaning up descriptions and comments
    Applying: NIFI-4839 - Implemented nice dynamic table output for all list-XXX commands (in simple mode)
    Applying: NIFI-4839 - Added abbreviation in simple output for name, description, and comments
    Applying: NIFI-4839 - Fixed handling of a connection object position - it doesn't have one and just returns null (calculated by the UI dynamically)
    Applying: NIFI-4839 - Switching standalone mode to default to simple output
    Applying: NIFI-4839 - The "Disabled" column had incorrect size and skewed the header
    Applying: NIFI-4839 Improving back-ref support so that ReferenceResolver is passed the option being resolved
    Applying: NIFI-4880: Add the ability to map record based on the aliases. This closes #2474
    Applying: Fixed failing unit tests: Changed the queues used to unique names so that one test won't interfere with another; also changed JMSPublisherConsumerTest to JMSPublisherConsumerIT since it is an integration test between the publisher and consumer with ActiveMQ as the broker
    Applying: NIFI-4916 - ConvertExcelToCSVProcessor inherit parent attributes. This closes #2500.
    Applying: NIFI-2630 Allow PublishJMS to send TextMessages
    Applying: NIFI-2630: Changed name of queue in unit test to be unique in order to avoid getting messages from another test if the other tests fails to properly shutdown the connection. This closes #2458.
    Applying: NIFI-4920 Skipping sensitive properties when updating component properties from versioned component. This closes #2505.
    Applying: NIFI-4773 - Fixed column type map initialization in QueryDatabaseTable
    Applying: NIFI-4922 - Add badges to the README file
    Applying: NIFI-4872 Added annotation for specifying scenarios in which components can cause high usage of system resources.
    Applying: NIFI-4925:
    Applying: NIFI-4855:
    Applying: NIFI-4893 Cannot convert Avro schemas to Record schemas with default arrays
    Applying: NIFI-4928 Updated BouncyCastle dependencies to version 1.59.
    Applying: NIFI-4929 Converted the majority of MongoDB unit tests to integration tests so they can be reliably run with 'mvn -Pintegration-tests integration-test'
    Applying: NIFI-4948 - MongoDB Lookup Service throws an exception if there is no match
    Applying: NIFI-4859 Corrects Swagger Spec VersionedFlowState allowableValues
    Applying: NIFI-4835 Corrects Swagger spec response types in FlowResource
    Applying: NIFI-4949 Converted nifi-mongodb-services' unit tests into integration tests so that the @Ignore annotation doesn't have to be removed to make them run.
    Applying: NIFI-4870 Upgraded activemq-client and activemq-broker versions to 5.15.3.
    Applying: NIFI-4945:
    Applying: NIFI-4936 trying to quiet down the mvn output a bit so we dont exceed the travis-ci 4MB max
    Applying: NIFI-3039 Provenance Repository - Fix PurgeOldEvent and Rollover Size Limits
    Applying: NIFI-4960: fix object setting. This closes #2531.
    Applying: NIFI-4936:
    Applying: NIFI-4944: Guard against race condition in Snappy for PutHiveStreaming
    Applying: NIFI-4885:
    Applying: NIFI-4953 - FetchHBaseRow - update log level for "not found" to DEBUG instead of ERROR
    Applying: NIFI-4958 - This closes #2529. Fix Travis job status + atlas profile
    Applying: NIFI-4885 fixing checkstyle issue
    Applying: NIFI-4800 Expose the flattenMode as property in FlattenJSON processor
    Applying: NIFI-3402 - Added etag support to InvokeHTTP
    Applying: NIFI-4833 Add ScanHBase Processor
    Applying: NIFI-4774: Allow user to choose which write-ahead log implementation should be used by the WriteAheadFlowFileRepository
    Applying: NIFI-4969 Fixing error when importing versioned flow with a processor that uses event driven scheduling. This closes #2539
    Applying: NIFI-4864 Fixing Additional Resources property pointing at a directory won't find new JARs
    Applying: NIFI-4864 - Additional improvements to additonal resource loading
    Applying: NIFI-4968: fix printing indefinite log errors
    Applying: NIFI-4966 - JacksonCSVRecordReader - NPE with some CSV formats
    Applying: NIFI-4967 - CSVRecordReader does not read header with specific formats
    Applying: NIFI-4955 - Preserve columns ordering with CSV and ValidateRecord
    Applying: NIFI-4962 FlattenJson processor add unexpected backslash after flatten
    Applying: NIFI-4821 - Upgrade poi to 3.17
    Applying: NIFI-4972 - SelectHiveQL to emit FETCH provenance event
    Applying: NIFI-4976: If unable to retrieve message content, warn an error but acknowledge message.
    Applying: NIFI-4849: Implemented REST Endpoint and associated backend code to generate a Diagnostics Report for a Processor
    Applying: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
    Applying: NIFI-4935 Refactoring to support specifying schema branch or schema version when using schema by name strategy
    Applying: Fixed checkstyle violation
    Applying: NIFI-4978: Fixed ReportLineageToAtlas NPE when unscheduled
    Applying: Changed some unit tests to Integration Tests because they are creating embedded JMS Brokers (implicitly) and commnicating with it. This becomes brittle in a full build, especially if any other unit test attempts to create a broker implicitly due to the way that the activemq broker implicit creation works.
    Applying: NIFI-4959: Remove flowfiles and close connection for Bad Requests causing IOException
    Applying: NIFI-4973:
    Applying: NIFI-4951: Update convertToAvroObject to use the DataTypeUtils conversion function
    Applying: NIFI-4950 Defining behavior for MergeContent when more than 1 FlowFile has the same fragment.index value
    Applying: NIFI-4918: looping over several methods to try and fit the dynamic attribute.
    Applying: NIFI-4973 Added license comment to new file RemoteProcessGroupEntityMergerTest.java
    Applying: NIFI-4981 Allow using FF's content as message in PutEmail
    Applying: NIFI-4777: get schema by id even if not latest
    Applying: NIFI-4912: This closes #2494. Update jackson version to latest stable version (2.9.4)
    Applying: NIFI-4902: This closes #2485. Updated ConsumeAMQP, PublishAMQP to use one connection per concurrent task instead of a single connection shared by all concurrent tasks. This offers far better throughput when the network latency is non-trivial. Also refactored to simplify code
    Applying: NIFI-4991 This closes #2564.
    Applying: NIFI-4979 - Fix ReportLineageToAtlas documentation errors
    Applying: NIFI-4557 Added Expression Language support for PGP private keyring passphrase in EncryptContent.
    Applying: Revert "NIFI-4809 - Implement a SiteToSiteMetricsReportingTask"
    Applying: NIFI-4995 release process exposed this item doesn't get updated by versions mechanism so using project version instead
    Applying: NIFI-4885:
    Applying: NIFI-4999: Added request URI into the "restlistener.request.uri" attribute
    Applying: NIFI-4917: Externalize Keytab and Principal configuration from Processors to a Controller Service. This gives us the ability to allow users to interact with those Keytabs/Principals to which they've been given access without allowing them access to all Keytabs and Principals
    Applying: NIFI-4882: Resolve issue with parsing custom date, time, and timestamp formats in CSV files
    Applying: NIFI-4989 Made PutMongo able to use nested lookup keys, a query param and multiple lookup keys.
    Applying: NIFI-4631: Use java.nio.file.Files in ListFile to improve performance
    Applying: NIFI-5011: Fixed threading bug in TestFileSystemRepository
    Applying: NIFI-5013 create test jar for standard processors
    Applying: NIFI-5009:
    Applying: NIFI-4864: Improvements to PR #2470
    Applying: NIFI-5012: When connecting to cluster, esure that controller services appropriately enabled/disabled
    Applying: NIFI-4743 - Added configurable null suppression to PutElasticsearchHttpRecord
    Applying: NIFI-4325 Added new processor that uses the JSON DSL.
    Applying: NIFI-4977 Adding expression language support to the Sender properties of PutSyslog
    Applying: NIFI-4995 updating copyright year on all notices
    Applying: NIFI-4388: Modules Not Honored
    Applying: NIFI-4895: Added backend code to give users the ability to forcibly terminate a processor if the processor does not complete after clicking Stop
    Applying: NIFI-4993: Accept null user at querying ProvenanceRepository
    Applying: [NIFI-4998] update node and npm version
    Applying: NIFI-5030: If ControlRate encounters a FlowFile that cannot be transferred, it should continue processing other FlowFiles that have different group attribute values
    Applying: NIFI-5026: Refactored StandardPreparedQuery so that instead of a List of Strings that may or may not correspond to compiled expressions and a Map of String to Compiled Expression, the StandardPreparedQuery now just takes a List of Expression objects, and those Expressions can be evaluated to return the proper result
    Applying: NIFI-5032: This closes #2596.
    Applying: NIFI-5017 - ConvertExcelToCSVProcessor - EL support for rows and columns to skip
    Applying: NIFI-5021 Moving nifi-elasticsearch-client-service-api to nifi-elasticsearch-bundle and creating NAR for the API
    Applying: NIFI-5033:
    Applying: NIFI-4883: Updated ValidateRecord to allow an optional Record Writer to be configured for invalid records that differs from the Record Writer used for writing valid records
    Applying: NIFI-5035 Moving MongoDB services out of standard services
    Applying: NIFI-4658 set Maximum Number of Entries to required and allow FlowFiles having fragment.count greater than Max Entries property
    Applying: NIFI-5037 - ConsumeEWS Fails to Read Emtpy Message Body Emails
    Applying: NIFI-5034:
    Applying: NIFI-5034:
    Applying: NIFI-4788 This closes #2427. Exposing nifi.web.proxy.host configuration for Docker containers.
    Applying: NIFI-4995-RC3 prepare release nifi-1.6.0-RC3
    Applying: NIFI-4995-RC3 prepare for next development iteration
    Applying: NIFI-4932: Enable S2S work behind a Reverse Proxy
    Applying: NIFI-5027 Adding commands pg-get-services, pg-enable-services, and pg-disable-services
    Applying: NIFI-3599 Allow back pressure object count and data size to be configurable in nifi.properties. This closes #2497
    Applying: NIFI-5043: TailFile in Multifile mode should not open new readers in onTrigger
    Applying: NIFI-5045: Fixed error code handling in PutHiveQL. This closes #2608
    Applying: NIFI-4149 - Indicate if EL is evaluated against FFs or not
    Applying: NIFI-4149: Minor tweaks to verbiage
    Applying: NIFI-4149 - fixed contrib-check errors
    Applying: NIFI-5048 Corrected typo in KeytabCredentialsService controller service description
    Applying: NIFI-5047 Fixed a bug in PutMongo that prevented it from working if the mode is insert and the query/query key are not set.
    Applying: NIFI-5050 update frontend-maven-plugin
    Applying: NIFI-4995 update docker image and text info for next release
    Applying: NIFI-4862: Keep incoming flowfile attributes in outgoing flowfiles from SelectHiveQL
    Applying: NIFI-5042 Added section Restricted Components in Versioned Flows and edited related section in Adding Components to the Canvas
    Applying: NIFI-4857: Support String<->byte[] conversion for records
    Applying: NIFI-4941 Updated nifi.sensitive.props.additional.keys description to refer to nifi.properties
    Applying: NIFI-5063 Added screenshots and supporting text for Primary Node processors
    Applying: NIFI-5055 added ability to unpenalize MockFlowFile directly or from MockProcessSession
    Applying: NIFI-5065: HTTP Site-to-Site fails sending data more than 2GB at once
    Applying: NIFI-4927 - InfluxDB Query Processor
    Applying: NIFI-5074 Added section Variables in Versioned Flows to User Guide
    Applying: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
    Applying: NIFI-4997:
    Applying: NIFI-1706: Extend QueryDatabaseTable to support arbitrary queries
    Applying: NIFI-4942 [WIP] Added skeleton for secure hash handling in encrypt-config toolkit. Added test resource for Python scrypt implementation/verifier. Added unit tests.
    Applying: NIFI-5089: Exclude Maven progress output from Travis builds
    Applying: NIFI-5085 In InvokeHttp, moving the OkHttp Response object to a try with resources
    Applying: NIFI-4516 Added QuerySolr after rebase
    Applying: NIFI-4647: Fix support for strings in unions for ConvertAvroToORC
    Applying: NIFI-5097 This closes #2647. add client-side HTTP redirect to error index.jsp
    Applying: NIFI-4942 Fixes Travis CI build
    Applying: NIFI-5062: Removed hbase-client dependecy from hbase bundle
    Applying: NIFI-5095: Suppress SET property parse failure at Hive processors
    Applying: NIFI-5078 - added source/destination connection info in S2SStatusRT
    Applying: NIFI-5096: Periodically poll ZooKeeper to determine the leader for each registered role in Leader Election. This avoids a condition whereby a node may occasionally fail to receive notification that it is no longer the elected leader.
    Applying: Added provenance reporting
    Applying: NIFI-5082: Added support for custom Oracle timestamp types to Avro conversion
    Applying: NIFI-5099 - Update flow differences when updating a connection
    Applying: NIFI-5075: Do not execute Funnels with no outgoing connections
    Applying: NIFI-4185: Add XML Record Reader
    Applying: NIFI-4185: Minor tweaks to XML Record Reader, around documentation and error handling
    Applying: NIFI-3576 Support for QueryInfo relationship, can be used to track no-hits
    Applying: NIFI-4561 ExecuteSQL returns no FlowFile for some queries
    Applying: NIFI-4035 - Adding PutSolrRecord Processor that reads NiFi records and indexes them into Solr as SolrDocuments
    Applying: NIFI-5108 Updated all explicit refs and media nar usage of commons-compress to latest version and updated spring redis client
    Applying: NIFI-5000 - ListHDFS properly lists files from updated directory path
    Applying: NIFI-5015: Implemented Azure Queue Storage processors
    Applying: NIFI-5066:
    Applying: NIFI-4923 Updated nifi-hadoop-libraries-nar, nifi-hdfs-processors, and nifi-hadoop-utils dependency on hadoop-client from 2.7.3 to 3.0.0
    Applying: NIFI-5060 Updated SubstringAfter record processing to support multi-character search trimming
    Applying: NIFI-1295:
    Applying: NIFI-5092 - Removed local state management for S2S Bulletins RT
    Applying: NIFI-5120 AbstractListenEventProcessor supports expression language
    Applying: NIFI-5116 Implemented logic to translate nifi.properties file to CLI properties format.
    Applying: NIFI-5123: Move SchemaRegistryService to nifi-avro-record-utils
    Applying: NIFI-5090: Create move target dir dynamically at FetchFTP and FetchSFTP
    Applying: NIFI-5105: Improvements for nifi-aws-bundle
    Applying: NIFI-5129: AWS Processors now displays proper region names
    Applying: NIFI-5124:
    Applying: NIFI-4456: Support multiple JSON objects in JSON record reader/writers
    Applying: NIFI-5142: Do not allow a connection's destination to be changed to a funnel if the source is the same funnel. Also fixed some typos in StandardFunnel. This closes #2669
    Applying: NIFI-4393: Handle database specific identifier escape characters
    Applying: NIFI-5134 Explicitly requesting UGI to relogin before attempting to get a DB connection in HiveConnectionPool
    Applying: NIFI-5130 ExecuteInfluxDBQuery processor chunking support
    Applying: NIFI-4980: Typo in ReportAtlasLineage kafka kerberos service name property
    Applying: NIFI-5135:
    Applying: NIFI-5150: Fixed bug that caused StandardProcessSession.append() to copy too much data when called on an incoming flowfile
    Applying: NIFI-5152: MoveHDFS now works even with no upstream connection
    Applying: NIFI-5153: If a node is disconnected due to failure to complete mutable request, the node should be allowed to rejoin
    Applying: NIFI-5154: When Processor or Controller Service is added to a Process Group, remove any references from it to any other Controller Service that is not reachable from the newly assigned Process Group
    Applying: NIFI-5163 Clearing version control info when creating a template
    Applying: NIFI-5161 - Moved filename escaping method to TlsHelper.java to allow use by the different Tls modes.
    Applying: NIFI-5136 Ensure processor references are removed from LogRepository and from ProcessScheduler
    Applying: NIFI-5146 Only support HTTP or HTTPS operation for NiFi API/UI
    Applying: NIFI-5156: Updated GCP SDK to latest version
    Applying: NIFI-5138: Bug fix to ensure that when we have a CHOICE between two or more REOCRD types that we choose the appropriate RECORD type when creating the Record in the JSON Reader.
    Applying: NIFI-5168 - ReplaceText Processor Should Use Single FlowFile Processing Instead of Batch
    Applying: NIFI-5064 - Fixes and improvements to PutKudu processor
    Applying: NIFI-4942 This closes #2690. Resolved test failures in JCE limited mode.
    Applying: NIFI-5167:
    Applying: NIFI-5180: update JMS additional details to set Destination Type to Required, default 'QUEUE'
    Applying: NIFI-4938 Upgraded org.eclipse.paho.client.mqttv3 dependency version to 1.2.0
    Applying: NIFI-4637 Added support for visibility labels to the HBase processors.
    Applying: NIFI-4637: HBase visibility lables
    Applying: NIFI-5121: Added DBCPService API method for passing in flow file attributes when available
    Applying: NIFI-5049 Fix handling of Phonenix datetime columns
    Davids-MacBook-Pro:nifi david$ `


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Huh, I just checked out your latest branch and it's clean as a whistle WRT rebasing and appears 100% up to date w/ the upstream master. So not sure what the heck is going on here. Try doing a `git push origin --force NIFI-4914`. That should forcefully knock out whatever is going wrong.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Yeah, I think @mattyb149 is right about that. I just pulled your branch again, did a rebase against master and there was an explosion of conflicts. Once you resolve those, you have to do `git rebase --continue` to finish the rebasing.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @joewitt something changed with the way expression language support is declared in the `PropertyDescriptor.Builder`. TL;DR when you use the deprecated `expressionLanguageSupported(boolean)` method, it doesn't seem to set up the new internal structure correctly.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183209029
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/src/main/resources/META-INF/NOTICE ---
    @@ -0,0 +1,612 @@
    +nifi-druid-controller-service-api-nar
    --- End diff --
    
    Needs to be changed to the pulsar client service.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    I don't see an upstream branch labeled 1.6.0....which upstream branch should I merge into my local branch?


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    One single squashed commit, with both NIFI-4908 & NIFI-4914 changes in it. 


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Abandoning this PR in favor of PR #2702
    
    https://github.com/apache/nifi/pull/2702


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183208823
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java ---
    @@ -0,0 +1,43 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +/**
    + * Service interface for any object that can be pooled for re-use., which
    + * defines methods for closing the object, effectively marking it no longer
    + * usable.
    + *
    + * @author david
    --- End diff --
    
    Please remove.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234762
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = consumerService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                final byte[] value = msg.getData();
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                   session.getProvenanceReporter().receive(flowFile, "From " + getWrappedConsumer(context).getTransitURL());
    +                   session.transfer(flowFile, REL_SUCCESS);
    +                   session.commit();
    --- End diff --
    
    You might want to consider doing this every X number of flowfiles or based on a timer.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Followed all of the steps as you suggested. Performed a mvn -Pcontrib-check clean install without errors. 
    
    Now we are failing CI due to missing content under the nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/ folder.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183208835
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.controller.ControllerService;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +
    +
    +@Tags({"Pulsar"})
    --- End diff --
    
    Could use others like "client" and "pool."


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234732
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.function.BiConsumer;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +
    +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. "
    +        + "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, "
    +        + "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any "
    +        + "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or "
    +        + "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
    +        + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual "
    +        + "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the "
    +        + "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.")
    +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The number of records received")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, PublishPulsarRecord_1_X.class})
    +public class ConsumePulsarRecord_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    public static final String MSG_COUNT = "record.count";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The number of records to combine into a single flow file.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
    +            .name("parse_failure")
    +            .description("FlowFiles for which the content was not prasable")
    +            .build();
    +
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(BATCH_SIZE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_PARSE_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                .asControllerService(RecordSetWriterFactory.class);
    +
    +        List<Message> messages = null;
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                messages = handleAsync(context, session);
    +
    +            } else {
    +               messages = consume(context, session);
    +            }
    +
    +            processMessages(context, session, messages, readerFactory, writerFactory, context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean());
    +
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    /**
    +     * Pull messages off of the topic until we have reached BATCH_SIZE or BATCH_DURATION
    +     * whichever occurs first.
    +     */
    +    private List<Message> consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        List<Message> messages = new ArrayList<Message>();
    +
    +        long startTime = System.currentTimeMillis();
    +
    +        while ( (messages.size() < context.getProperty(BATCH_SIZE).asInteger())
    +                && (queryTimeout == 0 || System.currentTimeMillis() - startTime < queryTimeout ) ) {
    +            messages.add(consumer.receive());
    +        }
    +
    +        return messages;
    +    }
    +
    +    private void processMessages(ProcessContext context, ProcessSession session, List<Message> messages, RecordReaderFactory readerFactory,
    +        RecordSetWriterFactory writerFactory, boolean async) throws PulsarClientException {
    +
    +        if (messages.isEmpty())
    +           return;
    +
    +        final AtomicLong messagesReceived = new AtomicLong(0L);
    +
    +        final BiConsumer<Message, Exception> handleParseFailure = (msg, e) -> {
    +            FlowFile failureFlowFile = session.create();
    +            if (msg.getData() != null) {
    +               failureFlowFile = session.write(failureFlowFile, out -> out.write(msg.getData()));
    +            }
    +            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
    +        };
    +
    +        RecordSetWriter writer = null;
    +        FlowFile flowFile = null;
    +        OutputStream rawOut = null;
    +
    +        for (Message msg: messages)  {
    +            RecordReader reader = getRecordReader(msg, readerFactory, handleParseFailure);
    +            Record firstRecord = getFirstRecord(msg, reader, handleParseFailure);
    +
    +            if (firstRecord == null) {
    +                // If the message doesn't contain any record, just ack the message
    +                if (async) {
    +                   ackService.submit(new Callable<Void>() {
    +                      @Override
    +                      public Void call() throws Exception {
    +                         return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                      }
    +                   });
    +                } else {
    +                   consumer.getConsumer().acknowledge(msg);
    +                }
    +
    +                continue;
    +            }
    +
    +            // Session / FlowFile starts here
    +            if (flowFile == null) {
    +                flowFile = session.create();
    +                rawOut = session.write(flowFile);
    +            }
    +
    +            // Create the Record Writer
    +            if (writer == null) {
    +                try {
    +                    writer = getRecordWriter(writerFactory, firstRecord.getSchema(), rawOut);
    +                    writer.beginRecordSet();
    +                } catch (SchemaNotFoundException | IOException ex) {
    +                    getLogger().error("Failed to obtain Schema for FlowFile.", ex);
    +                    throw new ProcessException(ex);
    +                }
    +            }
    +
    +            // Read all the records from this message, as it may contain several
    +            try {
    +
    +                for (Record record = firstRecord; record != null; record = reader.nextRecord()) {
    +                    writer.write(record);
    +                    messagesReceived.incrementAndGet();
    +                 }
    +
    +                } catch (MalformedRecordException | IOException mEx) {
    +                   handleParseFailure.accept(msg, mEx);
    +                }
    +
    +            // Acknowledge the message
    +            if (async) {
    +               ackService.submit(new Callable<Void>() {
    +                   @Override
    +                   public Void call() throws Exception {
    +                      return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                   }
    +               });
    +            } else {
    +                  consumer.getConsumer().acknowledge(msg);
    +            }
    +        }
    +
    +        // Clean-up and transfer session
    +        try {
    +            if (writer != null)
    +               writer.finishRecordSet();
    +
    +            if (rawOut != null)
    +              rawOut.close();
    +        } catch (IOException e1) {
    +            getLogger().error("Error cleaning up", e1);
    +        }
    +
    +        if (flowFile != null) {
    +           session.putAttribute(flowFile, MSG_COUNT, messagesReceived.toString());
    +           session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Missing provenance reporting.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Something is a bit wonky with this latest push, perhaps a bad merge?


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio closed the pull request at:

    https://github.com/apache/nifi/pull/2614


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @david-streamlio your PR appears to delete the nifi-grpc directory...You will want to avoid doing that.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Yea, I'm not sure what happened
    
    First I did the following 4 steps you recommended:
    
    As a rule of thumb, this is how you want to do this sort of update:
    
    git checkout master
    git pull upstream master
    git checkout YOUR_BRANCH
    git rebase master
    
    Then I checkout made the changes, ran mvn -Pcontrib-check clean install and had a clean install. When I tried to do a git push, I got the following error
    
    Davids-MacBook-Pro:nifi david$ git push origin
    To https://github.com/david-streamlio/nifi.git
     ! [rejected]            NIFI-4914 -> NIFI-4914 (non-fast-forward)
    error: failed to push some refs to 'https://github.com/david-streamlio/nifi.git'
    hint: Updates were rejected because the tip of your current branch is behind
    hint: its remote counterpart. Integrate the remote changes (e.g.
    hint: 'git pull ...') before pushing again.
    
    
    So, I fixed the conflicts, committed them and pushed again. 


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Bingo! That's what you should be seeing now. Fix the merge conflicts, `git add` the files and continue (and repeat until it's done)


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235146
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +
    +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
    +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. "
    +    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class})
    +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            getLogger().error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    +            return;
    +        }
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                    .asControllerService(RecordSetWriterFactory.class);
    +
    +        final Map<String, String> attributes = flowFile.getAttributes();
    +        final AtomicLong messagesSent = new AtomicLong(0L);
    +
    +        try {
    +            final InputStream in = new ByteArrayInputStream(messageContent);
    +            final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
    +            final RecordSet recordSet = reader.createRecordSet();
    +            final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
    +            final Producer producer = getWrappedProducer(topic, context).getProducer();
    +
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +               InFlightMessageMonitor bundle = getInFlightMessages(writerFactory, schema, recordSet);
    +               this.sendAsync(producer, session, flowFile, bundle);
    +               handleAsync(bundle, session, flowFile, topic);
    +           } else {
    +               messagesSent.addAndGet(send(producer, writerFactory, schema, recordSet));
    +               session.putAttribute(flowFile, MSG_COUNT, messagesSent.get() + "");
    +               session.putAttribute(flowFile, TOPIC_NAME, topic);
    +               session.adjustCounter("Messages Sent", messagesSent.get(), true);
    +               session.getProvenanceReporter().send(flowFile, "Sent " + messagesSent.get() + " records to " + topic );
    +               session.transfer(flowFile, REL_SUCCESS);
    +           }
    +        } catch (final SchemaNotFoundException | MalformedRecordException | IOException e) {
    +            session.transfer(flowFile, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    private int send(final Producer producer, final RecordSetWriterFactory writerFactory, final RecordSchema schema, final RecordSet recordSet) throws IOException, SchemaNotFoundException {
    +
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    +
    +        Record record;
    +        int recordCount = 0;
    +
    +        while ((record = recordSet.next()) != null) {
    +            recordCount++;
    +            baos.reset();
    +
    +            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos)) {
    +                writer.write(record);
    +                writer.flush();
    +            }
    +
    +            producer.send(baos.toByteArray());
    +        }
    +
    +        return recordCount;
    +    }
    +
    +    private InFlightMessageMonitor getInFlightMessages(RecordSetWriterFactory writerFactory, RecordSchema schema, RecordSet recordSet) throws IOException, SchemaNotFoundException {
    +        ArrayList<byte[]> records = new ArrayList<byte[]>();
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    +
    +        Record record;
    +
    +        while ((record = recordSet.next()) != null) {
    +            baos.reset();
    +
    +            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos)) {
    +                writer.write(record);
    +                writer.flush();
    +            }
    +            records.add(baos.toByteArray());
    +        }
    +
    +        return new InFlightMessageMonitor(records);
    +    }
    +
    +    /* Launches all of the async send requests
    +     *
    +     */
    +    protected void sendAsync(Producer producer, ProcessSession session, FlowFile flowFile, InFlightMessageMonitor monitor) {
    +
    +        if (monitor == null || monitor.getRecords().isEmpty())
    +           return;
    +
    +        for (byte[] record: monitor.getRecords() ) {
    +           try {
    +
    +              publisherService.submit(new Callable<MessageId>() {
    +                @Override
    +                public MessageId call() throws Exception {
    +                  try {
    +                     return producer.sendAsync(record).handle((msgId, ex) -> {
    +                         if (msgId != null) {
    +                            monitor.getSuccessCounter().incrementAndGet();
    +                            return msgId;
    +                         } else {
    +                            monitor.getFailureCounter().incrementAndGet();
    +                            monitor.getFailures().add(record);
    +                            return null;
    +                         }
    +                     }).get();
    +
    +                   } catch (final Throwable t) {
    +                      // This traps any exceptions thrown while calling the producer.sendAsync() method.
    +                      monitor.getFailureCounter().incrementAndGet();
    +                      monitor.getFailures().add(record);
    +                      return null;
    +                   } finally {
    +                      monitor.getLatch().countDown();
    +                   }
    +               }
    +             });
    +          } catch (final RejectedExecutionException ex) {
    +            // This can happen if the processor is being Unscheduled.
    +          }
    +       }
    +    }
    +
    +    private void handleAsync(InFlightMessageMonitor monitor, ProcessSession session, FlowFile flowFile, String topic) {
    +       try {
    +
    +           boolean useOriginalForFailures = false;
    +           monitor.getLatch().await();
    +
    +           if (monitor.getSuccessCounter().intValue() > 0) {
    +               session.putAttribute(flowFile, MSG_COUNT, monitor.getSuccessCounter().get() + "");
    +               session.putAttribute(flowFile, TOPIC_NAME, topic);
    +               session.adjustCounter("Messages Sent", monitor.getSuccessCounter().get(), true);
    +               session.getProvenanceReporter().send(flowFile, "Sent " + monitor.getSuccessCounter().get() + " records to " + topic );
    +               session.transfer(flowFile, REL_SUCCESS);
    +           } else {
    +              // Route the original FlowFile to failure, otherwise we need to create a new FlowFile for the Failure relationship
    +              useOriginalForFailures = true;
    +           }
    +
    +           if (monitor.getFailureCounter().intValue() > 0) {
    +              // Create a second flow file for failures.
    +              FlowFile failureFlowFile = useOriginalForFailures ? flowFile : session.create();
    +
    +              StringBuffer sb = new StringBuffer();
    +              for (byte[] badRecord : monitor.getFailures()) {
    +                 sb.append(new String(badRecord)).append(System.lineSeparator());
    +              }
    +
    +              failureFlowFile = session.write(failureFlowFile, out -> out.write(sb.toString().trim().getBytes()));
    +              session.putAttribute(failureFlowFile, MSG_COUNT, monitor.getFailureCounter().get() + "");
    --- End diff --
    
    `String.valueOf(monitor.getFailureCounter().get())` would better.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234838
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = consumerService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                final byte[] value = msg.getData();
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                   session.getProvenanceReporter().receive(flowFile, "From " + getWrappedConsumer(context).getTransitURL());
    +                   session.transfer(flowFile, REL_SUCCESS);
    +                   session.commit();
    +                   getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +                }
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +            context.getProperty(PULSAR_CLIENT_SERVICE)
    +                .asControllerService(PulsarClientPool.class)
    +                .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +            msg = consumer.receive();
    +            final byte[] value = msg.getData();
    +
    +            if (value != null && value.length > 0) {
    +                flowFile = session.create();
    +                flowFile = session.write(flowFile, out -> {
    +                    out.write(value);
    +                });
    +
    +                session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    +                logger.info("Created {} from {} messages received from Pulsar Server and transferred to 'success'",
    +                        new Object[]{flowFile, 1});
    +
    +                session.commit();
    +
    +                /*
    +                 * This Processor acknowledges receipt of the data and/or removes the data
    +                 * from the external source in order to prevent receipt of duplicate files.
    +                 * This is done only after the ProcessSession by which the FlowFile was created
    +                 * has been committed! Failure to adhere to this principle may result in data
    +                 * loss, as restarting NiFi before the session has been committed will result
    +                 * in the temporary file being deleted. Note, however, that it is possible using
    +                 * this approach to receive duplicate data because the application could be
    +                 * restarted after committing the session and before acknowledging or removing
    +                 * the data from the external source. In general, though, potential data duplication
    +                 * is preferred over potential data loss.
    +                 */
    +                getLogger().info("Acknowledging message " + msg.getMessageId());
    --- End diff --
    
    Needs to be wrapped as well.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Ok, I'll take a look. Worst comes to worst, we cherry-pick the heck out of your commits and play a game of hacky sack with Git and commits to get you back on track.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r180329019
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml ---
    @@ -0,0 +1,50 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-nar</artifactId>
    +    <packaging>nar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api-nar</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    +            <type>nar</type>
    +		</dependency>
    +		
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-processors</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    --- End diff --
    
    Needs to be updated to 1.7.0-SNAPSHOT


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235215
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/RecordBasedConst.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +
    +public final class RecordBasedConst {
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The Record Reader to use for incoming FlowFiles")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("The Record Writer to use in order to serialize the data before sending to Pulsar")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .build();
    +
    +    private RecordBasedConst() {
    --- End diff --
    
    Not needed.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235065
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +
    +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
    +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. "
    +    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class})
    +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            getLogger().error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    +            return;
    +        }
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                    .asControllerService(RecordSetWriterFactory.class);
    +
    +        final Map<String, String> attributes = flowFile.getAttributes();
    +        final AtomicLong messagesSent = new AtomicLong(0L);
    +
    +        try {
    +            final InputStream in = new ByteArrayInputStream(messageContent);
    +            final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
    +            final RecordSet recordSet = reader.createRecordSet();
    +            final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
    +            final Producer producer = getWrappedProducer(topic, context).getProducer();
    +
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +               InFlightMessageMonitor bundle = getInFlightMessages(writerFactory, schema, recordSet);
    +               this.sendAsync(producer, session, flowFile, bundle);
    +               handleAsync(bundle, session, flowFile, topic);
    +           } else {
    +               messagesSent.addAndGet(send(producer, writerFactory, schema, recordSet));
    +               session.putAttribute(flowFile, MSG_COUNT, messagesSent.get() + "");
    --- End diff --
    
    These should be `flowfile = session.putAttribute(...` Otherwise AFAIK it's going to drop the new attribute.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    To provide more details here: after the release of 1.6.0 (with #2205), we deprecated the use of ``expressionLanguageSupported(boolean)`` in favor of ``expressionLanguageSupported(ExpressionLanguageScope)``. Using the deprecated method does not break anything in the code itself, the only difference is just in the test framework where we are more strict checking if the appropriate scope has been set on the property description. If not, the unit test will fail.
    
    Most of the existing opened PRs introducing new properties that support expression language without using the new scope might introduce unit test failures. The only thing required is to update the property with the proper scope.
    
    Hope this clarifies things a bit.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    run the rebase against my 'master' branch or my 'NIFI-4914' branch?


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    I would rework this thing personally so that it gets back to just the commit(s) meant to be included.  Like start fresh off latest master, apply the actual changes intended, then end up with a single commit and either force push that OR send a diff PR.  This one is tough to follow as-is but seems like a good/cool/useful PR.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235123
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +
    +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
    +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. "
    +    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class})
    +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            getLogger().error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    +            return;
    +        }
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                    .asControllerService(RecordSetWriterFactory.class);
    +
    +        final Map<String, String> attributes = flowFile.getAttributes();
    +        final AtomicLong messagesSent = new AtomicLong(0L);
    +
    +        try {
    +            final InputStream in = new ByteArrayInputStream(messageContent);
    +            final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
    +            final RecordSet recordSet = reader.createRecordSet();
    +            final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
    +            final Producer producer = getWrappedProducer(topic, context).getProducer();
    +
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +               InFlightMessageMonitor bundle = getInFlightMessages(writerFactory, schema, recordSet);
    +               this.sendAsync(producer, session, flowFile, bundle);
    +               handleAsync(bundle, session, flowFile, topic);
    +           } else {
    +               messagesSent.addAndGet(send(producer, writerFactory, schema, recordSet));
    +               session.putAttribute(flowFile, MSG_COUNT, messagesSent.get() + "");
    +               session.putAttribute(flowFile, TOPIC_NAME, topic);
    +               session.adjustCounter("Messages Sent", messagesSent.get(), true);
    +               session.getProvenanceReporter().send(flowFile, "Sent " + messagesSent.get() + " records to " + topic );
    +               session.transfer(flowFile, REL_SUCCESS);
    +           }
    +        } catch (final SchemaNotFoundException | MalformedRecordException | IOException e) {
    +            session.transfer(flowFile, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    private int send(final Producer producer, final RecordSetWriterFactory writerFactory, final RecordSchema schema, final RecordSet recordSet) throws IOException, SchemaNotFoundException {
    +
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    +
    +        Record record;
    +        int recordCount = 0;
    +
    +        while ((record = recordSet.next()) != null) {
    +            recordCount++;
    +            baos.reset();
    +
    +            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos)) {
    +                writer.write(record);
    +                writer.flush();
    +            }
    +
    +            producer.send(baos.toByteArray());
    +        }
    +
    +        return recordCount;
    +    }
    +
    +    private InFlightMessageMonitor getInFlightMessages(RecordSetWriterFactory writerFactory, RecordSchema schema, RecordSet recordSet) throws IOException, SchemaNotFoundException {
    +        ArrayList<byte[]> records = new ArrayList<byte[]>();
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    +
    +        Record record;
    +
    +        while ((record = recordSet.next()) != null) {
    +            baos.reset();
    +
    +            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos)) {
    +                writer.write(record);
    +                writer.flush();
    +            }
    +            records.add(baos.toByteArray());
    +        }
    +
    +        return new InFlightMessageMonitor(records);
    +    }
    +
    +    /* Launches all of the async send requests
    +     *
    +     */
    +    protected void sendAsync(Producer producer, ProcessSession session, FlowFile flowFile, InFlightMessageMonitor monitor) {
    +
    +        if (monitor == null || monitor.getRecords().isEmpty())
    +           return;
    +
    +        for (byte[] record: monitor.getRecords() ) {
    +           try {
    +
    +              publisherService.submit(new Callable<MessageId>() {
    +                @Override
    +                public MessageId call() throws Exception {
    +                  try {
    +                     return producer.sendAsync(record).handle((msgId, ex) -> {
    +                         if (msgId != null) {
    +                            monitor.getSuccessCounter().incrementAndGet();
    +                            return msgId;
    +                         } else {
    +                            monitor.getFailureCounter().incrementAndGet();
    +                            monitor.getFailures().add(record);
    +                            return null;
    +                         }
    +                     }).get();
    +
    +                   } catch (final Throwable t) {
    +                      // This traps any exceptions thrown while calling the producer.sendAsync() method.
    +                      monitor.getFailureCounter().incrementAndGet();
    +                      monitor.getFailures().add(record);
    +                      return null;
    +                   } finally {
    +                      monitor.getLatch().countDown();
    +                   }
    +               }
    +             });
    +          } catch (final RejectedExecutionException ex) {
    +            // This can happen if the processor is being Unscheduled.
    +          }
    +       }
    +    }
    +
    +    private void handleAsync(InFlightMessageMonitor monitor, ProcessSession session, FlowFile flowFile, String topic) {
    +       try {
    +
    +           boolean useOriginalForFailures = false;
    +           monitor.getLatch().await();
    +
    +           if (monitor.getSuccessCounter().intValue() > 0) {
    +               session.putAttribute(flowFile, MSG_COUNT, monitor.getSuccessCounter().get() + "");
    --- End diff --
    
    `flowFile = session.putAttribute`


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235109
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +
    +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
    +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. "
    +    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class})
    +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            getLogger().error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +
    +        // Nothing to do, so skip this Flow file.
    +        if (messageContent == null || messageContent.length < 1) {
    +            session.transfer(flowFile, REL_SUCCESS);
    +            return;
    +        }
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                    .asControllerService(RecordSetWriterFactory.class);
    +
    +        final Map<String, String> attributes = flowFile.getAttributes();
    +        final AtomicLong messagesSent = new AtomicLong(0L);
    +
    +        try {
    +            final InputStream in = new ByteArrayInputStream(messageContent);
    +            final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
    +            final RecordSet recordSet = reader.createRecordSet();
    +            final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
    +            final Producer producer = getWrappedProducer(topic, context).getProducer();
    +
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +               InFlightMessageMonitor bundle = getInFlightMessages(writerFactory, schema, recordSet);
    +               this.sendAsync(producer, session, flowFile, bundle);
    +               handleAsync(bundle, session, flowFile, topic);
    +           } else {
    +               messagesSent.addAndGet(send(producer, writerFactory, schema, recordSet));
    +               session.putAttribute(flowFile, MSG_COUNT, messagesSent.get() + "");
    +               session.putAttribute(flowFile, TOPIC_NAME, topic);
    +               session.adjustCounter("Messages Sent", messagesSent.get(), true);
    +               session.getProvenanceReporter().send(flowFile, "Sent " + messagesSent.get() + " records to " + topic );
    +               session.transfer(flowFile, REL_SUCCESS);
    +           }
    +        } catch (final SchemaNotFoundException | MalformedRecordException | IOException e) {
    +            session.transfer(flowFile, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    private int send(final Producer producer, final RecordSetWriterFactory writerFactory, final RecordSchema schema, final RecordSet recordSet) throws IOException, SchemaNotFoundException {
    +
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    +
    +        Record record;
    +        int recordCount = 0;
    +
    +        while ((record = recordSet.next()) != null) {
    +            recordCount++;
    +            baos.reset();
    +
    +            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos)) {
    +                writer.write(record);
    +                writer.flush();
    +            }
    +
    +            producer.send(baos.toByteArray());
    +        }
    +
    +        return recordCount;
    +    }
    +
    +    private InFlightMessageMonitor getInFlightMessages(RecordSetWriterFactory writerFactory, RecordSchema schema, RecordSet recordSet) throws IOException, SchemaNotFoundException {
    +        ArrayList<byte[]> records = new ArrayList<byte[]>();
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    +
    +        Record record;
    +
    +        while ((record = recordSet.next()) != null) {
    +            baos.reset();
    +
    +            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos)) {
    +                writer.write(record);
    +                writer.flush();
    +            }
    +            records.add(baos.toByteArray());
    +        }
    +
    +        return new InFlightMessageMonitor(records);
    +    }
    +
    +    /* Launches all of the async send requests
    +     *
    +     */
    +    protected void sendAsync(Producer producer, ProcessSession session, FlowFile flowFile, InFlightMessageMonitor monitor) {
    +
    +        if (monitor == null || monitor.getRecords().isEmpty())
    --- End diff --
    
    Should have curly brackets.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234826
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = consumerService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                final byte[] value = msg.getData();
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                   session.getProvenanceReporter().receive(flowFile, "From " + getWrappedConsumer(context).getTransitURL());
    +                   session.transfer(flowFile, REL_SUCCESS);
    +                   session.commit();
    +                   getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +                }
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    +        if (consumer != null) {
    +
    +            context.getProperty(PULSAR_CLIENT_SERVICE)
    +                .asControllerService(PulsarClientPool.class)
    +                .getConsumerPool().evict(consumer);
    +        }
    +
    +        consumer = null;
    +    }
    +
    +    /*
    +     * When this Processor expects to receive many small files, it may
    +     * be advisable to create several FlowFiles from a single session
    +     * before committing the session. Typically, this allows the Framework
    +     * to treat the content of the newly created FlowFiles much more efficiently.
    +     */
    +    private void consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        final ComponentLog logger = getLogger();
    +        final Message msg;
    +        FlowFile flowFile = null;
    +
    +        try {
    +
    +            msg = consumer.receive();
    +            final byte[] value = msg.getData();
    +
    +            if (value != null && value.length > 0) {
    +                flowFile = session.create();
    +                flowFile = session.write(flowFile, out -> {
    +                    out.write(value);
    +                });
    +
    +                session.getProvenanceReporter().receive(flowFile, "From " + context.getProperty(TOPIC).getValue());
    +                session.transfer(flowFile, REL_SUCCESS);
    +                logger.info("Created {} from {} messages received from Pulsar Server and transferred to 'success'",
    --- End diff --
    
    Let's do this as debug and wrap with an if statement because this will get called a lot.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183209099
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    > Consider adding expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).
    
    In fact, you might want to consider that for all of these properties so you can make it more customizable.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r180328774
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-client-service</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    --- End diff --
    
    Needs to be updated to 1.7.0-SNAPSHOT


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r180328802
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-client-service</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-processor-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-ssl-context-service-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-mock</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    --- End diff --
    
    Needs to be updated to 1.7.0-SNAPSHOT


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r180328729
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/pom.xml ---
    @@ -0,0 +1,35 @@
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <modelVersion>4.0.0</modelVersion>
    +  
    +  <parent>
    +    <groupId>org.apache.nifi</groupId>
    +    <artifactId>nifi-pulsar-bundle</artifactId>
    +    <version>1.7.0-SNAPSHOT</version>
    +  </parent>
    +  
    +  <artifactId>nifi-pulsar-client-service-api-nar</artifactId>
    +  <packaging>nar</packaging>
    +  
    +  <dependencies>
    +  	<dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    --- End diff --
    
    Needs to be updated to 1.7.0-SNAPSHOT


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r180328937
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml ---
    @@ -0,0 +1,50 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-nar</artifactId>
    +    <packaging>nar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api-nar</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    --- End diff --
    
    Needs to be updated to 1.7.0-SNAPSHOT
    and indentation to fix


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183233581
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.function.BiConsumer;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +
    +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. "
    +        + "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, "
    +        + "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any "
    +        + "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or "
    +        + "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
    +        + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual "
    +        + "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the "
    +        + "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.")
    +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The number of records received")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, PublishPulsarRecord_1_X.class})
    +public class ConsumePulsarRecord_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    public static final String MSG_COUNT = "record.count";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The number of records to combine into a single flow file.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
    +            .name("parse_failure")
    +            .description("FlowFiles for which the content was not prasable")
    +            .build();
    +
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(BATCH_SIZE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_PARSE_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                .asControllerService(RecordSetWriterFactory.class);
    +
    +        List<Message> messages = null;
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                messages = handleAsync(context, session);
    +
    +            } else {
    +               messages = consume(context, session);
    +            }
    +
    +            processMessages(context, session, messages, readerFactory, writerFactory, context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean());
    +
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    /**
    +     * Pull messages off of the topic until we have reached BATCH_SIZE or BATCH_DURATION
    +     * whichever occurs first.
    +     */
    +    private List<Message> consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        List<Message> messages = new ArrayList<Message>();
    +
    +        long startTime = System.currentTimeMillis();
    +
    +        while ( (messages.size() < context.getProperty(BATCH_SIZE).asInteger())
    +                && (queryTimeout == 0 || System.currentTimeMillis() - startTime < queryTimeout ) ) {
    +            messages.add(consumer.receive());
    +        }
    +
    +        return messages;
    +    }
    +
    +    private void processMessages(ProcessContext context, ProcessSession session, List<Message> messages, RecordReaderFactory readerFactory,
    +        RecordSetWriterFactory writerFactory, boolean async) throws PulsarClientException {
    +
    +        if (messages.isEmpty())
    +           return;
    +
    +        final AtomicLong messagesReceived = new AtomicLong(0L);
    +
    +        final BiConsumer<Message, Exception> handleParseFailure = (msg, e) -> {
    +            FlowFile failureFlowFile = session.create();
    +            if (msg.getData() != null) {
    +               failureFlowFile = session.write(failureFlowFile, out -> out.write(msg.getData()));
    +            }
    +            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
    +        };
    +
    +        RecordSetWriter writer = null;
    +        FlowFile flowFile = null;
    +        OutputStream rawOut = null;
    +
    +        for (Message msg: messages)  {
    +            RecordReader reader = getRecordReader(msg, readerFactory, handleParseFailure);
    --- End diff --
    
    Are you assuming that the message can have more than one record in it? If not, this could get generate a lot of garbage.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183208994
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar.pool;
    +
    +import java.util.Properties;
    +
    +/**
    + * Factory pattern interface for @PoolableResource objects. Concrete implementations
    + * of this interface will be responsible for the creation of @PoolableResource objects
    + * based on the Properties passed in.
    + *
    + * @author david
    --- End diff --
    
    Please remove


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183209203
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.")
    +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent "
    +                    + "overload on broker. (default: 5000) It should be configured with higher value only in case "
    +                    + "of it requires to produce/subscribe on thousands of topics")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling connections to brokers (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message listeners (default: 1 thread)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " +
    +                "which current connection will be closed and client creates a new connection that give " +
    +                "chance to connect a different broker (default: 50)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe operations will be retried until this " +
    +                "interval, after which the operation will be maked as failed (default: 30 seconds)")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("30")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat info (default: 60 seconds) Stats will be activated " +
    +                "with positive statsIntervalSeconds It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("60")
    +            .build();
    +
    +    public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder()
    +            .name("Use TCP nodelay flag")
    +            .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n"
    +                    + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical "
    +                    + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might "
    +                    + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay "
    +                    + "flag to false.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor
    +            .Builder().name("MAX_PRODUCERS")
    +            .displayName("Producer Pool Size")
    +            .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor
    +            .Builder().name("MAX_CONSUMERS")
    +            .displayName("Consumer Pool Size")
    +            .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool")
    +            .required(true)
    +            .defaultValue("10")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("ssl.context.service")
    +            .displayName("SSL Context Service")
    +            .description("Specifies the SSL Context Service to use for communicating with Pulsar.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +    private volatile PulsarClient client;
    +
    +        private volatile ResourcePoolImpl<PulsarProducer> producers;
    +        private volatile ResourcePoolImpl<PulsarConsumer> consumers;
    +        private ClientConfiguration clientConfig;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(PULSAR_SERVICE_URL);
    +        props.add(MAX_CONSUMERS);
    +        props.add(MAX_PRODUCERS);
    +        props.add(CONCURRENT_LOOKUP_REQUESTS);
    +        props.add(CONNECTIONS_PER_BROKER);
    +        props.add(IO_THREADS);
    +        props.add(LISTENER_THREADS);
    +        props.add(MAXIMUM_REJECTED_REQUESTS);
    +        props.add(OPERATION_TIMEOUT);
    +        props.add(STATS_INTERVAL);
    +        props.add(USE_TCP_NO_DELAY);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    /**
    +     * @param context the configuration context
    +     * @throws InitializationException if unable to create a database connection
    +     * @throws UnsupportedAuthenticationException if the Broker URL uses a non-supported authentication mechanism
    +     */
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException, UnsupportedAuthenticationException {
    +
    +            createClient(context);
    +
    +            if (this.client == null) {
    +                throw new InitializationException("Unable to create Pulsar Client");
    +            }
    +
    +            producers = new ResourcePoolImpl<PulsarProducer>(new PulsarProducerFactory(client), context.getProperty(MAX_PRODUCERS).asInteger());
    +            consumers = new ResourcePoolImpl<PulsarConsumer>(new PulsarConsumerFactory(client,
    +                buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).getValue(), getClientConfig(context).isUseTls())),
    +                context.getProperty(MAX_CONSUMERS).asInteger());
    +
    +    }
    +
    +    private void createClient(final ConfigurationContext context) throws InitializationException {
    +
    +        try {
    +            this.client = PulsarClient.create(buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).getValue(),
    +                        getClientConfig(context).isUseTls()), getClientConfig(context));
    +
    +        } catch (Exception e) {
    +            throw new InitializationException("Unable to create Pulsar Client", e);
    +        }
    +
    +    }
    +
    +    private static String buildPulsarBrokerRootUrl(String uri, boolean tlsEnabled) {
    +        StringBuilder builder = new StringBuilder();
    +        builder.append("pulsar");
    +
    +        if (tlsEnabled) {
    +           builder.append("+ssl");
    +        }
    +
    +        builder.append("://");
    +        builder.append(uri);
    +        return builder.toString();
    +    }
    +
    +    private ClientConfiguration getClientConfig(ConfigurationContext context) throws UnsupportedAuthenticationException {
    +
    +        if (clientConfig == null) {
    +            clientConfig = new ClientConfiguration();
    +
    +            if (context.getProperty(CONCURRENT_LOOKUP_REQUESTS).isSet()) {
    +                clientConfig.setConcurrentLookupRequest(context.getProperty(CONCURRENT_LOOKUP_REQUESTS).asInteger());
    +            }
    +
    +            if (context.getProperty(CONNECTIONS_PER_BROKER).isSet()) {
    +                clientConfig.setConnectionsPerBroker(context.getProperty(CONNECTIONS_PER_BROKER).asInteger());
    +            }
    +
    +            if (context.getProperty(IO_THREADS).isSet()) {
    +                clientConfig.setIoThreads(context.getProperty(IO_THREADS).asInteger());
    +            }
    +
    +            if (context.getProperty(LISTENER_THREADS).isSet()) {
    +                clientConfig.setListenerThreads(context.getProperty(LISTENER_THREADS).asInteger());
    +            }
    +
    +            if (context.getProperty(MAXIMUM_REJECTED_REQUESTS).isSet()) {
    +                clientConfig.setMaxNumberOfRejectedRequestPerConnection(context.getProperty(MAXIMUM_REJECTED_REQUESTS).asInteger());
    +            }
    +
    +            if (context.getProperty(OPERATION_TIMEOUT).isSet()) {
    +                clientConfig.setOperationTimeout(context.getProperty(OPERATION_TIMEOUT).asInteger(), TimeUnit.SECONDS);
    +            }
    +
    +            if (context.getProperty(STATS_INTERVAL).isSet()) {
    +                clientConfig.setStatsInterval(context.getProperty(STATS_INTERVAL).asLong(), TimeUnit.SECONDS);
    +            }
    +
    +            if (context.getProperty(USE_TCP_NO_DELAY).isSet()) {
    +                clientConfig.setUseTcpNoDelay(context.getProperty(USE_TCP_NO_DELAY).asBoolean());
    +            }
    +
    +            // Configure TLS
    +            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +
    +            if (sslContextService != null && sslContextService.isTrustStoreConfigured() && sslContextService.isKeyStoreConfigured()) {
    +                    clientConfig.setUseTls(true);
    +                    clientConfig.setTlsTrustCertsFilePath(sslContextService.getTrustStoreFile());
    +
    +                    Map<String, String> authParams = new HashMap<>();
    +
    +                    // TODO This should be a different value than the TlsTrustCertsFilePath above.
    +                    authParams.put("tlsCertFile", sslContextService.getTrustStoreFile());
    --- End diff --
    
    Are you sure you about these two params? `tlsCertFile` intuitively comes to mind as a `.pem` file, not a keystore. Same with the `tlsKeyFile` not being a keystore file (JKS or P12`)


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235809
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-client-service</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    --- End diff --
    
    This should be marked as provided as well.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    At that point, this should work now to clear things up hopefully:
    
    `git push origin --force NIFI-4914`


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @david-streamlio you brought in a whole lot of commits from other people with that merge from upstream/master. I checked out your branch and did a rebase on it (`git rebase master`) and that seemed to clear it up. I would recommend doing that on your branch locally and verifying that that "merge commit" with the commit message `Merge remote-tracking branch 'upstream/master' into NIFI-4914` goes away.
    
    As a rule of thumb, this is how you want to do this sort of update:
    
    1. git checkout master
    2. git pull upstream master
    3. git checkout YOUR_BRANCH
    4. git rebase master
    
    Once you've done that, the last command will replay your commits on top of the most recent version of upstream/master.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    So, is this a change in the 1.7.x code base or is it already in the 1.6.0 code?  I created my fork back on Feb 22nd based on the 1.6.0-SNAPSHOT branch, which does not have these enums.  Should I create a new fork of 1.6.x?  of 1.7.x?  Please advise


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183210297
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.Properties;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.nifi.pulsar.PulsarProducer;
    +import org.apache.nifi.pulsar.cache.LRUCache;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.pulsar.client.api.CompressionType;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
    +
    +public abstract class AbstractPulsarProducerProcessor extends AbstractPulsarProcessor {
    +
    +    public static final String MSG_COUNT = "msg.count";
    +    public static final String TOPIC_NAME = "topic.name";
    +
    +    static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression");
    +    static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
    +    static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
    +
    +    static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all "
    +                                                                                                                       + "partitions in a round robin manner");
    +    static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition");
    +
    +    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
    +            .name("topic")
    +            .displayName("Topic Name")
    +            .description("The name of the Pulsar Topic.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be sent asyncronously or not. Messages sent"
    +                    + " syncronously will be acknowledged immediately before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous publish requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_ENABLED = new PropertyDescriptor.Builder()
    +            .name("Batching Enabled")
    +            .description("Control whether automatic batching of messages is enabled for the producer. "
    +                    + "default: false [No batching] When batching is enabled, multiple calls to "
    +                    + "Producer.sendAsync can result in a single batch to be sent to the broker, leading "
    +                    + "to better throughput, especially when publishing small messages. If compression is "
    +                    + "enabled, messages will be compressed at the batch level, leading to a much better "
    +                    + "compression ratio for similar headers or contents. When enabled default batch delay "
    +                    + "is set to 10 ms and default batch size is 1000 messages")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Batching Max Messages")
    +            .description("Set the maximum number of messages permitted in a batch. default: "
    +                    + "1000 If set to a value greater than 1, messages will be queued until this "
    +                    + "threshold is reached or batch interval has elapsed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Batch Interval")
    +            .description("Set the time period within which the messages sent will be batched default: 10ms "
    +                    + "if batch messages are enabled. If set to a non zero value, messages will be queued until "
    +                    + "this time interval or until the Batching Max Messages threshould has been reached")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +    public static final PropertyDescriptor BLOCK_IF_QUEUE_FULL = new PropertyDescriptor.Builder()
    +            .name("Block if Message Queue Full")
    +            .description("Set whether the processor should block when the outgoing message queue is full. "
    +                    + "Default is false. If set to false, send operations will immediately fail with "
    +                    + "ProducerQueueIsFullError when there is no space left in pending queue.")
    +            .required(false)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("Compression Type")
    +            .description("Set the compression type for the producer.")
    +            .required(false)
    +            .allowableValues(COMPRESSION_TYPE_NONE, COMPRESSION_TYPE_LZ4, COMPRESSION_TYPE_ZLIB)
    +            .defaultValue(COMPRESSION_TYPE_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor MESSAGE_ROUTING_MODE = new PropertyDescriptor.Builder()
    +            .name("Message Routing Mode")
    +            .description("Set the message routing mode for the producer. This applies only if the destination topic is partitioned")
    +            .required(false)
    +            .allowableValues(MESSAGE_ROUTING_MODE_CUSTOM_PARTITION, MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION, MESSAGE_ROUTING_MODE_SINGLE_PARTITION)
    +            .defaultValue(MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor PENDING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Max Pending Messages")
    +            .description("Set the max size of the queue holding the messages pending to receive an "
    +                    + "acknowledgment from the broker.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    protected LRUCache<String, PulsarProducer> producers;
    +    protected ProducerConfiguration producerConfig;
    +
    +    // Pool for running multiple publish Async requests
    +    protected ExecutorService publisherPool;
    +    protected ExecutorCompletionService<MessageId> publisherService;
    +
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +        // We only need this if we are running in Async mode
    +        if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +            publisherPool = Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +            publisherService = new ExecutorCompletionService<>(publisherPool);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown(final ProcessContext context) {
    +
    +       if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +           // Stop all the async publishers
    +           try {
    +              publisherPool.shutdown();
    +              publisherPool.awaitTermination(20, TimeUnit.SECONDS);
    +           } catch (InterruptedException e) {
    +              getLogger().error("Unable to stop all the Pulsar Producers", e);
    +           }
    +       }
    +    }
    +
    +    @OnStopped
    +    public void cleanUp(final ProcessContext context) {
    +       // Close all of the producers and invalidate them, so they get removed from the Resource Pool
    +       getProducerCache(context).clear();
    +    }
    +
    +    protected void sendAsync(Producer producer, ProcessSession session, FlowFile flowFile, byte[] messageContent) {
    +
    +        try {
    +              publisherService.submit(new Callable<MessageId>() {
    +                 @Override
    +                 public MessageId call() throws Exception {
    +                   try {
    +                     return producer.sendAsync(messageContent).handle((msgId, ex) -> {
    +                       if (msgId != null) {
    +                           session.putAttribute(flowFile, MSG_COUNT , "1");
    +                           session.putAttribute(flowFile, TOPIC_NAME, producer.getTopic());
    +                           session.adjustCounter("Messages Sent", 1, true);
    +                           session.getProvenanceReporter().send(flowFile, "Sent async message to " + producer.getTopic() );
    --- End diff --
    
    This should have the full transit URL in it.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @pvillard31 thanks that makes a lot more sense to me now.  


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Davids-MacBook-Pro:nifi david$ git checkout NIFI-4914
    Switched to branch 'NIFI-4914'
    Your branch is up to date with 'origin/NIFI-4914'.
    Davids-MacBook-Pro:nifi david$ git rebase master
    First, rewinding head to replay your work on top of it...
    Applying: Added Apache Pulsar Processors and Controller Service
    Applying: Changed code to use new ExpressionLanguageScope Enum
    Applying: Changed artifact versions to 1.7.0-SNAPSHOT
    Applying: Added Apache Pulsar Processors and Controller Service
    Using index info to reconstruct a base tree...
    M	nifi-nar-bundles/pom.xml
    .git/rebase-apply/patch:848: trailing whitespace.
      
    .git/rebase-apply/patch:854: trailing whitespace.
      
    .git/rebase-apply/patch:857: trailing whitespace.
      
    .git/rebase-apply/patch:859: space before tab in indent.
      	<dependency>
    .git/rebase-apply/patch:865: trailing whitespace.
      
    warning: squelched 161 whitespace errors
    warning: 166 lines add whitespace errors.
    Falling back to patching base and 3-way merge...
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/pom.xml
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestPublishPulsar_1_X.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestPublishPulsar_1_X.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar_1_X.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar_1_X.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord_1_X.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord_1_X.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/AbstractPulsarProcessorTest.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/AbstractPulsarProcessorTest.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/pom.xml
    Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml
    error: Failed to merge in the changes.
    Patch failed at 0004 Added Apache Pulsar Processors and Controller Service
    The copy of the patch that failed is found in: .git/rebase-apply/patch
    
    Resolve all conflicts manually, mark them as resolved with
    "git add/rm <conflicted_files>", then run "git rebase --continue".
    You can instead skip this commit: run "git rebase --skip".
    To abort and get back to the state before "git rebase", run "git rebase --abort".


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    I pulled your branch and got a lot of merge conflicts when I tried rebasing against master. I am pretty sure that @mattyb149 is right or on the right track. When you followed those four steps, did you adjust the name `upstream` to match whatever you call the remote that points to `https://github.com/apache/nifi`? Because if you did all of that, you would have had a real fun time with all of the merge conflicts on rebasing :)


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183235251
  
    --- Diff: nifi-nar-bundles/pom.xml ---
    @@ -93,6 +93,7 @@
             <module>nifi-spark-bundle</module>
             <module>nifi-atlas-bundle</module>
             <module>nifi-druid-bundle</module>
    +        <module>nifi-pulsar-bundle</module>
    --- End diff --
    
    You need to add a dependency declaration for the NAR in `nifi-assembly/pom.xml`. Just putting that note here since there's no really good place to track that in the review.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183209046
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
    +import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +@Tags({ "Pulsar"})
    --- End diff --
    
    Should be fleshed out with more tags.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @david-streamlio Ok, I think I figured out what happened. At some point, it looks like you accidentally did a pull on upstream master into your branch. The fact that you keep having over 200 commits even with rebasing against master very strongly suggests that. What I did to verify was I checked out your branch, pushed it to my fork and saw 210ish commits with a merge conflict into my master. So I locally rebased against master, did a forced push and it dropped it down to ~7 commits.
    
    So carefully follow the four steps I gave you:
    
    1. git checkout master
    2. git pull upstream master
    3. git checkout NIFI-4914
    4. git rebase master
    
    **Make sure** that `upstream` is changed to whatever you call `apache/nifi` on github.com. The do:
    
    `git push origin --force NIFI-4914` and it should all clear up to a few commits.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234861
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/InFlightMessageMonitor.java ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Helper class to monitor the asynchronous submission of a large number
    + * of records to Apache Pulsar.
    + *
    + * @author david
    --- End diff --
    
    Please remove.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Well, that didn't go as well as we had hoped.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @MikeThomsen I will work on those changes now and have them completed ASAP....Thanks for your patience 


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    When I added it to nifi-assembly, I got this error on startup:
    
    ```
    java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: Provider org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord_1_X could not be instantiated
    	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    	at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:148)
    	at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:123)
    	at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:771)
    	at org.apache.nifi.NiFi.<init>(NiFi.java:157)
    	at org.apache.nifi.NiFi.<init>(NiFi.java:71)
    	at org.apache.nifi.NiFi.main(NiFi.java:292)
    Caused by: java.lang.NoClassDefFoundError: org/apache/nifi/serialization/MalformedRecordException
    	at java.lang.Class.getDeclaredConstructors0(Native Method)
    	at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    	at java.lang.Class.getConstructor0(Class.java:3075)
    	at java.lang.Class.newInstance(Class.java:412)
    	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    	... 8 common frames omitted
    Caused by: java.lang.ClassNotFoundException: org.apache.nifi.serialization.MalformedRecordException
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 13 common frames omitted
    ```
    
    Your declaration for `nifi-record` looks right, so we'll have to dig deeper once you get these changes done.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r180329047
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml ---
    @@ -0,0 +1,50 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-pulsar-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-pulsar-nar</artifactId>
    +    <packaging>nar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-client-service-api-nar</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    +            <type>nar</type>
    +		</dependency>
    +		
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-pulsar-processors</artifactId>
    +            <version>1.6.0-SNAPSHOT</version>
    +        </dependency>
    +        
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-pulsar-client-service</artifactId>
    +			<version>1.6.0-SNAPSHOT</version>
    --- End diff --
    
    Needs to be updated to 1.7.0-SNAPSHOT
    and indentation to fix


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @joewitt I think if he were to rebase and drop [this commit](https://github.com/apache/nifi/pull/2614/commits/99a980cc1e6bbb45e64fb3431545023eefb8522c) it should fix that. What do you think?


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234534
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.function.BiConsumer;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +
    +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. "
    +        + "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, "
    +        + "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any "
    +        + "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or "
    +        + "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
    +        + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual "
    +        + "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the "
    +        + "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.")
    +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The number of records received")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, PublishPulsarRecord_1_X.class})
    +public class ConsumePulsarRecord_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    public static final String MSG_COUNT = "record.count";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The number of records to combine into a single flow file.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
    +            .name("parse_failure")
    +            .description("FlowFiles for which the content was not prasable")
    +            .build();
    +
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(BATCH_SIZE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_PARSE_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                .asControllerService(RecordSetWriterFactory.class);
    +
    +        List<Message> messages = null;
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                messages = handleAsync(context, session);
    +
    +            } else {
    +               messages = consume(context, session);
    +            }
    +
    +            processMessages(context, session, messages, readerFactory, writerFactory, context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean());
    +
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    /**
    +     * Pull messages off of the topic until we have reached BATCH_SIZE or BATCH_DURATION
    +     * whichever occurs first.
    +     */
    +    private List<Message> consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        List<Message> messages = new ArrayList<Message>();
    +
    +        long startTime = System.currentTimeMillis();
    +
    +        while ( (messages.size() < context.getProperty(BATCH_SIZE).asInteger())
    +                && (queryTimeout == 0 || System.currentTimeMillis() - startTime < queryTimeout ) ) {
    +            messages.add(consumer.receive());
    +        }
    +
    +        return messages;
    +    }
    +
    +    private void processMessages(ProcessContext context, ProcessSession session, List<Message> messages, RecordReaderFactory readerFactory,
    +        RecordSetWriterFactory writerFactory, boolean async) throws PulsarClientException {
    +
    +        if (messages.isEmpty())
    +           return;
    +
    +        final AtomicLong messagesReceived = new AtomicLong(0L);
    +
    +        final BiConsumer<Message, Exception> handleParseFailure = (msg, e) -> {
    +            FlowFile failureFlowFile = session.create();
    +            if (msg.getData() != null) {
    +               failureFlowFile = session.write(failureFlowFile, out -> out.write(msg.getData()));
    +            }
    +            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
    +        };
    +
    +        RecordSetWriter writer = null;
    +        FlowFile flowFile = null;
    +        OutputStream rawOut = null;
    +
    +        for (Message msg: messages)  {
    +            RecordReader reader = getRecordReader(msg, readerFactory, handleParseFailure);
    +            Record firstRecord = getFirstRecord(msg, reader, handleParseFailure);
    +
    +            if (firstRecord == null) {
    +                // If the message doesn't contain any record, just ack the message
    +                if (async) {
    +                   ackService.submit(new Callable<Void>() {
    +                      @Override
    +                      public Void call() throws Exception {
    +                         return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                      }
    +                   });
    +                } else {
    +                   consumer.getConsumer().acknowledge(msg);
    +                }
    +
    +                continue;
    +            }
    +
    +            // Session / FlowFile starts here
    +            if (flowFile == null) {
    +                flowFile = session.create();
    +                rawOut = session.write(flowFile);
    +            }
    +
    +            // Create the Record Writer
    +            if (writer == null) {
    +                try {
    +                    writer = getRecordWriter(writerFactory, firstRecord.getSchema(), rawOut);
    +                    writer.beginRecordSet();
    +                } catch (SchemaNotFoundException | IOException ex) {
    +                    getLogger().error("Failed to obtain Schema for FlowFile.", ex);
    +                    throw new ProcessException(ex);
    +                }
    +            }
    +
    +            // Read all the records from this message, as it may contain several
    +            try {
    +
    +                for (Record record = firstRecord; record != null; record = reader.nextRecord()) {
    +                    writer.write(record);
    +                    messagesReceived.incrementAndGet();
    +                 }
    +
    +                } catch (MalformedRecordException | IOException mEx) {
    +                   handleParseFailure.accept(msg, mEx);
    +                }
    +
    +            // Acknowledge the message
    +            if (async) {
    +               ackService.submit(new Callable<Void>() {
    +                   @Override
    +                   public Void call() throws Exception {
    +                      return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                   }
    +               });
    +            } else {
    +                  consumer.getConsumer().acknowledge(msg);
    +            }
    +        }
    +
    +        // Clean-up and transfer session
    +        try {
    +            if (writer != null)
    --- End diff --
    
    Needs curly brackets.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    I think it made it into "1.6.0-SNAPSHOT" after RC3 was cut. You have a bunch of errors in there, but they look like this:
    
    > [ERROR] testSingleRecordSuccess(org.apache.nifi.processors.pulsar.pubsub.async.TestAsyncPublishPulsarRecord_1_X)  Time elapsed: 0.011 s  <<< FAILURE!
    java.lang.AssertionError: java.lang.IllegalStateException: Attempting to evaluate expression language for topic using flow file attributes but the scope evaluation is set to NONE. The proper scope should be set in the property descriptor using PropertyDescriptor.Builder.expressionLanguageSupported(ExpressionLanguageScope)
    	at org.apache.nifi.processors.pulsar.pubsub.async.TestAsyncPublishPulsarRecord_1_X.testSingleRecordSuccess(TestAsyncPublishPulsarRecord_1_X.java:87)
    Caused by: java.lang.IllegalStateException: Attempting to evaluate expression language for topic using flow file attributes but the scope evaluation is set to NONE. The proper scope should be set in the property descriptor using PropertyDescriptor.Builder.expressionLanguageSupported(ExpressionLanguageScope)


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234943
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +
    +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
    +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. "
    +    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
    +    + "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to "
    +        + "FlowFiles that are routed to success.")
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class})
    +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(BATCHING_ENABLED);
    +        properties.add(BATCHING_MAX_MESSAGES);
    +        properties.add(BATCH_INTERVAL);
    +        properties.add(BLOCK_IF_QUEUE_FULL);
    +        properties.add(COMPRESSION_TYPE);
    +        properties.add(MESSAGE_ROUTING_MODE);
    +        properties.add(PENDING_MAX_MESSAGES);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        if (StringUtils.isBlank(topic)) {
    +            getLogger().error("Invalid topic specified {}", new Object[] {topic});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // Read the contents of the FlowFile into a byte array
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    --- End diff --
    
    This could break in some use cases. For example, I have processed 50GB text files using NiFi before. Not a common use case, but I'd like to know what others think here.
    
    @bbende @markap14 @alopresto @pvillard31 @mattyb149 can one of you take a look at this and weigh in on whether there is a better way to handle the input here to accommodate very large record sets?


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    `Davids-MacBook-Pro:nifi david$ git checkout NIFI-4914
    Switched to branch 'NIFI-4914'
    Your branch is up to date with 'origin/NIFI-4914'.
    Davids-MacBook-Pro:nifi david$ git rebase --continue
    No rebase in progress?
    `


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183208828
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.controller.ControllerService;
    +import org.apache.nifi.pulsar.pool.ResourcePool;
    +
    +
    +@Tags({"Pulsar"})
    +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration."
    +                     + "properties defined")
    +/**
    + * Service definition for apache Pulsar Client ControllerService
    + * responsible for maintaining a pool of @PulsarProducer and
    + * @PulsarConsumer objects.
    + *
    + * Since both of these objects can be reused, in a manner similar
    + * to database connections, and the cost to create these objects is
    + * relatively high. The PulsarClientPool keeps these objects in pools
    + * for re-use.
    + *
    + * @author david
    --- End diff --
    
    Please remove.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Davids-MacBook-Pro:nifi david$ git rebase --continue
    Applying: Changed artifact versions to 1.7.0-SNAPSHOT
    Applying: Fixed issues identified during code review
    Applying: Removed invalid characters left over from merge
    Using index info to reconstruct a base tree...
    M	nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml
    M	nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
    Falling back to patching base and 3-way merge...
    No changes -- Patch already applied.
    Davids-MacBook-Pro:nifi david$ git status
    On branch NIFI-4914
    Your branch and 'origin/NIFI-4914' have diverged,
    and have 210 and 217 different commits each, respectively.
      (use "git pull" to merge the remote branch into yours)


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    @MikeThomsen just wanted to clarify the pointers here..  Did we break an API that existing prior to 1.6 release that we should not have or was this a newer thing being leveraged that just happened to not settle until the release itself and thus requires a little rework in the PR.  Just want to make sure what we're putting @david-streamlio through here (and thanks for sticking with it david) is isolated.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234779
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.pulsar.PulsarClientPool;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@CapabilityDescription("Consumes messages from Apache Pulsar "
    +        + "The complementary NiFi processor for sending messages is PublishPulsar.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                handleAsync(context, session);
    +
    +            } else {
    +                consume(context, session);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    private void handleAsync(ProcessContext context, ProcessSession session) {
    +
    +        try {
    +            Future<Message> done = consumerService.take();
    +            Message msg = done.get();
    +
    +            if (msg != null) {
    +                FlowFile flowFile = null;
    +                final byte[] value = msg.getData();
    +                if (value != null && value.length > 0) {
    +                    flowFile = session.create();
    +                    flowFile = session.write(flowFile, out -> {
    +                        out.write(value);
    +                    });
    +
    +                   session.getProvenanceReporter().receive(flowFile, "From " + getWrappedConsumer(context).getTransitURL());
    +                   session.transfer(flowFile, REL_SUCCESS);
    +                   session.commit();
    +                   getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg);
    +                }
    +            }
    +
    +        } catch (InterruptedException | ExecutionException | PulsarClientException e) {
    +            getLogger().error("Trouble consuming messages ", e);
    +        }
    +
    +    }
    +
    +    @OnStopped
    +    public void close(final ProcessContext context) {
    +
    +        getLogger().info("Disconnecting Pulsar Consumer");
    --- End diff --
    
    I think this would be better off as:
    
    ```
    if (getLogger().isDebugLoggerEnabled()) {
        getLogger().debug(...
    }
    ```


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    Pretty sure we found the culprit:
    
    > Davids-MacBook-Pro:nifi david$ git rebase
    
    That should be `git rebase master`. Not sure what git thinks it's doing, but it's clearly not doing what you want it to do. Give that a shot and let us know what you get.


---

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2614#discussion_r183234572
  
    --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
    +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.function.BiConsumer;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +
    +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. "
    +        + "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, "
    +        + "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any "
    +        + "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or "
    +        + "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
    +        + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual "
    +        + "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the "
    +        + "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.")
    +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The number of records received")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, PublishPulsarRecord_1_X.class})
    +public class ConsumePulsarRecord_1_X extends AbstractPulsarConsumerProcessor {
    +
    +    public static final String MSG_COUNT = "record.count";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The number of records to combine into a single flow file.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
    +            .name("parse_failure")
    +            .description("FlowFiles for which the content was not prasable")
    +            .build();
    +
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(TOPIC);
    +        properties.add(SUBSCRIPTION);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +        properties.add(BATCH_SIZE);
    +        properties.add(MAX_WAIT_TIME);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_PARSE_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER)
    +                .asControllerService(RecordSetWriterFactory.class);
    +
    +        List<Message> messages = null;
    +        try {
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +                // Launch consumers
    +                consumeAsync(context, session);
    +
    +                // Handle completed consumers
    +                messages = handleAsync(context, session);
    +
    +            } else {
    +               messages = consume(context, session);
    +            }
    +
    +            processMessages(context, session, messages, readerFactory, writerFactory, context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean());
    +
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +
    +    }
    +
    +    /**
    +     * Pull messages off of the topic until we have reached BATCH_SIZE or BATCH_DURATION
    +     * whichever occurs first.
    +     */
    +    private List<Message> consume(ProcessContext context, ProcessSession session) throws PulsarClientException {
    +
    +        final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        Consumer consumer = getWrappedConsumer(context).getConsumer();
    +
    +        List<Message> messages = new ArrayList<Message>();
    +
    +        long startTime = System.currentTimeMillis();
    +
    +        while ( (messages.size() < context.getProperty(BATCH_SIZE).asInteger())
    +                && (queryTimeout == 0 || System.currentTimeMillis() - startTime < queryTimeout ) ) {
    +            messages.add(consumer.receive());
    +        }
    +
    +        return messages;
    +    }
    +
    +    private void processMessages(ProcessContext context, ProcessSession session, List<Message> messages, RecordReaderFactory readerFactory,
    +        RecordSetWriterFactory writerFactory, boolean async) throws PulsarClientException {
    +
    +        if (messages.isEmpty())
    +           return;
    +
    +        final AtomicLong messagesReceived = new AtomicLong(0L);
    +
    +        final BiConsumer<Message, Exception> handleParseFailure = (msg, e) -> {
    +            FlowFile failureFlowFile = session.create();
    +            if (msg.getData() != null) {
    +               failureFlowFile = session.write(failureFlowFile, out -> out.write(msg.getData()));
    +            }
    +            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
    +        };
    +
    +        RecordSetWriter writer = null;
    +        FlowFile flowFile = null;
    +        OutputStream rawOut = null;
    +
    +        for (Message msg: messages)  {
    +            RecordReader reader = getRecordReader(msg, readerFactory, handleParseFailure);
    +            Record firstRecord = getFirstRecord(msg, reader, handleParseFailure);
    +
    +            if (firstRecord == null) {
    +                // If the message doesn't contain any record, just ack the message
    +                if (async) {
    +                   ackService.submit(new Callable<Void>() {
    +                      @Override
    +                      public Void call() throws Exception {
    +                         return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                      }
    +                   });
    +                } else {
    +                   consumer.getConsumer().acknowledge(msg);
    +                }
    +
    +                continue;
    +            }
    +
    +            // Session / FlowFile starts here
    +            if (flowFile == null) {
    +                flowFile = session.create();
    +                rawOut = session.write(flowFile);
    +            }
    +
    +            // Create the Record Writer
    +            if (writer == null) {
    +                try {
    +                    writer = getRecordWriter(writerFactory, firstRecord.getSchema(), rawOut);
    +                    writer.beginRecordSet();
    +                } catch (SchemaNotFoundException | IOException ex) {
    +                    getLogger().error("Failed to obtain Schema for FlowFile.", ex);
    +                    throw new ProcessException(ex);
    +                }
    +            }
    +
    +            // Read all the records from this message, as it may contain several
    +            try {
    +
    +                for (Record record = firstRecord; record != null; record = reader.nextRecord()) {
    +                    writer.write(record);
    +                    messagesReceived.incrementAndGet();
    +                 }
    +
    +                } catch (MalformedRecordException | IOException mEx) {
    +                   handleParseFailure.accept(msg, mEx);
    +                }
    +
    +            // Acknowledge the message
    +            if (async) {
    +               ackService.submit(new Callable<Void>() {
    +                   @Override
    +                   public Void call() throws Exception {
    +                      return getWrappedConsumer(context).getConsumer().acknowledgeAsync(msg).get();
    +                   }
    +               });
    +            } else {
    +                  consumer.getConsumer().acknowledge(msg);
    +            }
    +        }
    +
    +        // Clean-up and transfer session
    +        try {
    +            if (writer != null)
    +               writer.finishRecordSet();
    +
    +            if (rawOut != null)
    +              rawOut.close();
    +        } catch (IOException e1) {
    +            getLogger().error("Error cleaning up", e1);
    +        }
    +
    +        if (flowFile != null) {
    +           session.putAttribute(flowFile, MSG_COUNT, messagesReceived.toString());
    +           session.transfer(flowFile, REL_SUCCESS);
    +        }
    +    }
    +
    +    private Record getFirstRecord(Message msg, RecordReader reader, BiConsumer<Message, Exception> handleParseFailure) {
    +
    +        Record firstRecord = null;
    +
    +        try {
    +            firstRecord = reader.nextRecord();
    +        } catch (IOException | MalformedRecordException ex) {
    +            handleParseFailure.accept(msg, ex);
    +        }
    +
    +        return firstRecord;
    +    }
    +
    +    private RecordReader getRecordReader(Message msg, RecordReaderFactory readerFactory, BiConsumer<Message, Exception> handleParseFailure) {
    +
    +        RecordReader reader = null;
    +        final byte[] recordBytes = msg.getData() == null ? new byte[0] : msg.getData();
    +
    +        try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
    +            reader = readerFactory.createRecordReader(Collections.emptyMap(), in, getLogger());
    +        } catch (MalformedRecordException | IOException | SchemaNotFoundException ex) {
    +            handleParseFailure.accept(msg, ex);
    +        }
    +
    +        return reader;
    +
    +    }
    +
    +    private RecordSetWriter getRecordWriter(RecordSetWriterFactory writerFactory, RecordSchema srcSchema, OutputStream out) throws SchemaNotFoundException, IOException {
    +        RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), srcSchema);
    +        return writerFactory.createWriter(getLogger(), writeSchema, out);
    +    }
    +
    +    protected List<Message> handleAsync(ProcessContext context, ProcessSession session) {
    +
    +       List<Message> messages = new ArrayList<Message>();
    +       final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +       try {
    +
    +            Future<Message> done = null;
    +            do {
    +                done = consumerService.poll(queryTimeout, TimeUnit.SECONDS);
    +
    +                if (done == null)
    --- End diff --
    
    Needs curly brackets.


---

[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

Posted by david-streamlio <gi...@git.apache.org>.
Github user david-streamlio commented on the issue:

    https://github.com/apache/nifi/pull/2614
  
    I tend to agree with @joewitt on this....Let's start from a fresh master and add in the new nifi-nar-bundle....It shouldn't be this hard to add in a completely fresh bundle, as the files are all net new and shouldn't have any upstream conflicts..


---