You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by markap14 <gi...@git.apache.org> on 2016/10/04 20:53:30 UTC

[GitHub] nifi pull request #1097: NIFI-2865: Refactored PublishKafka and PublishKafka...

GitHub user markap14 opened a pull request:

    https://github.com/apache/nifi/pull/1097

    NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow bat\u2026

    \u2026ching of FlowFiles within a single publish and to let messages timeout if not acknowledged

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/markap14/nifi NIFI-2865

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1097
    
----
commit 5b10a50ff34150c2642b544e4ee7a855c080b285
Author: Mark Payne <ma...@hotmail.com>
Date:   2016-09-08T23:37:35Z

    NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1097: NIFI-2865: Refactored PublishKafka and PublishKafka...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1097#discussion_r82194639
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.stream.io.exception.TokenTooLargeException;
    +import org.apache.nifi.stream.io.util.StreamDemarcator;
    +
    +public class PublisherLease implements Closeable {
    +    private final ComponentLog logger;
    +    private final Producer<byte[], byte[]> producer;
    +    private final int maxMessageSize;
    +    private final long maxAckWaitMillis;
    +    private volatile boolean poisoned = false;
    +
    +    private InFlightMessageTracker tracker;
    +
    +    public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) {
    +        this.producer = producer;
    +        this.maxMessageSize = maxMessageSize;
    +        this.logger = logger;
    +        this.maxAckWaitMillis = maxAckWaitMillis;
    +    }
    +
    +    protected void poison() {
    +        this.poisoned = true;
    +    }
    +
    +    public boolean isPoisoned() {
    +        return poisoned;
    +    }
    +
    +    void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException {
    +        if (tracker == null) {
    --- End diff --
    
    We probably could... but I held off on doing that, because it may get poisoned in a background thread, and the caller may not know when calling publish() that it was poisoned. If that happens, the new FlowFile is likely to fail anyway. If it doesn't then it's okay - there's no reason it can't get published to Kafka at that point. We simply poison the lease to ensure that a new one gets created, in case there's an issue with the connection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1097: NIFI-2865: Refactored PublishKafka and PublishKafka...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1097#discussion_r82070354
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---
    @@ -250,242 +241,141 @@
     
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return DESCRIPTORS;
    +        return PROPERTIES;
         }
     
         @Override
         protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
             return new PropertyDescriptor.Builder()
    -                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
    -                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
    -                .build();
    +            .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
    +            .name(propertyDescriptorName)
    +            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
    +            .dynamic(true)
    +            .build();
         }
     
         @Override
         protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
             return KafkaProcessorUtils.validateCommonProperties(validationContext);
         }
     
    -    volatile KafkaPublisher kafkaPublisher;
    -
    -    /**
    -     * This thread-safe operation will delegate to
    -     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
    -     * checking and creating (if necessary) Kafka resource which could be either
    -     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
    -     * destroy the underlying Kafka resource upon catching an {@link Exception}
    -     * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
    -     * After Kafka resource is destroyed it will be re-created upon the next
    -     * invocation of this operation essentially providing a self healing
    -     * mechanism to deal with potentially corrupted resource.
    -     * <p>
    -     * Keep in mind that upon catching an exception the state of this processor
    -     * will be set to no longer accept any more tasks, until Kafka resource is
    -     * reset. This means that in a multi-threaded situation currently executing
    -     * tasks will be given a chance to complete while no new tasks will be
    -     * accepted.
    -     *
    -     * @param context context
    -     * @param sessionFactory factory
    -     */
    -    @Override
    -    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    -        if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted.
    -            this.taskCounter.incrementAndGet();
    -            final ProcessSession session = sessionFactory.createSession();
    -            try {
    -                /*
    -                 * We can't be doing double null check here since as a pattern
    -                 * it only works for lazy init but not reset, which is what we
    -                 * are doing here. In fact the first null check is dangerous
    -                 * since 'kafkaPublisher' can become null right after its null
    -                 * check passed causing subsequent NPE.
    -                 */
    -                synchronized (this) {
    -                    if (this.kafkaPublisher == null) {
    -                        this.kafkaPublisher = this.buildKafkaResource(context, session);
    -                    }
    -                }
    -
    -                /*
    -                 * The 'processed' boolean flag does not imply any failure or success. It simply states that:
    -                 * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated
    -                 * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile
    -                 */
    -                boolean processed = this.rendezvousWithKafka(context, session);
    -                session.commit();
    -                if (!processed) {
    -                    context.yield();
    -                }
    -            } catch (Throwable e) {
    -                this.acceptTask = false;
    -                session.rollback(true);
    -                this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e});
    -            } finally {
    -                synchronized (this) {
    -                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
    -                        this.close();
    -                        this.acceptTask = true;
    -                    }
    -                }
    -            }
    -        } else {
    -            this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
    -            this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
    -            context.yield();
    +    private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
    +        PublisherPool pool = publisherPool;
    +        if (pool != null) {
    +            return pool;
             }
    +
    +        return publisherPool = createPublisherPool(context);
    +    }
    +
    +    protected PublisherPool createPublisherPool(final ProcessContext context) {
    +        final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
    +        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
    +
    +        final Map<String, Object> kafkaProperties = new HashMap<>();
    +        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
    +        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    +        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    +        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
    +
    +        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis);
         }
     
    -    /**
    -     * Will call {@link Closeable#close()} on the target resource after which
    -     * the target resource will be set to null. Should only be called when there
    -     * are no more threads being executed on this processor or when it has been
    -     * verified that only a single thread remains.
    -     *
    -     * @see KafkaPublisher
    -     * @see KafkaConsumer
    -     */
         @OnStopped
    -    public void close() {
    -        try {
    -            if (this.kafkaPublisher != null) {
    -                try {
    -                    this.kafkaPublisher.close();
    -                } catch (Exception e) {
    -                    this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e);
    -                }
    -            }
    -        } finally {
    -            this.kafkaPublisher = null;
    +    public void closePool() {
    +        if (publisherPool != null) {
    +            publisherPool.close();
             }
    +
    +        publisherPool = null;
         }
     
    -    /**
    -     * Will rendezvous with Kafka if {@link ProcessSession} contains
    -     * {@link FlowFile} producing a result {@link FlowFile}.
    -     * <br>
    -     * The result {@link FlowFile} that is successful is then transfered to
    -     * {@link #REL_SUCCESS}
    -     * <br>
    -     * The result {@link FlowFile} that is failed is then transfered to
    -     * {@link #REL_FAILURE}
    -     *
    -     */
    -    protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) {
    -        FlowFile flowFile = session.get();
    -        if (flowFile != null) {
    -            long start = System.nanoTime();
    -            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    -            Relationship relationship = REL_SUCCESS;
    -            if (!this.isFailedFlowFile(flowFile)) {
    -                String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -                long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    -                String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic);
    -                session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration);
    -                this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis",
    -                        new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration});
    -            } else {
    -                relationship = REL_FAILURE;
    -                flowFile = session.penalize(flowFile);
    -            }
    -            session.transfer(flowFile, relationship);
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet();
    +
    +        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
    +        if (flowFiles.isEmpty()) {
    +            return;
             }
    -        return flowFile != null;
    -    }
     
    -    /**
    -     * Builds and instance of {@link KafkaPublisher}.
    -     */
    -    protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) {
    -        final Map<String, String> kafkaProps = new HashMap<>();
    -        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps);
    -        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    -        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    -        kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
    -        this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
    -        final Properties props = new Properties();
    -        props.putAll(kafkaProps);
    -        KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger());
    -        return publisher;
    -    }
    +        final PublisherPool pool = getPublisherPool(context);
    +        if (pool == null) {
    +            context.yield();
    +            return;
    +        }
     
    -    /**
    -     * Will rendezvous with {@link KafkaPublisher} after building
    -     * {@link PublishingContext} and will produce the resulting
    -     * {@link FlowFile}. The resulting FlowFile contains all required
    -     * information to determine if message publishing originated from the
    -     * provided FlowFile has actually succeeded fully, partially or failed
    -     * completely (see {@link #isFailedFlowFile(FlowFile)}.
    -     */
    -    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
    -        final AtomicReference<KafkaPublisher.KafkaPublisherResult> publishResultRef = new AtomicReference<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(InputStream contentStream) throws IOException {
    -                PublishingContext publishingContext = PublishKafka.this.buildPublishingContext(flowFile, context, contentStream);
    -                KafkaPublisher.KafkaPublisherResult result = PublishKafka.this.kafkaPublisher.publish(publishingContext);
    -                publishResultRef.set(result);
    +        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
    +        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
    +
    +        final long startTime = System.nanoTime();
    +        try (final PublisherLease lease = pool.obtainPublisher()) {
    +            // Send each FlowFile to Kafka asynchronously.
    +            for (final FlowFile flowFile : flowFiles) {
    +                if (!isScheduled()) {
    +                    // If stopped, re-queue FlowFile instead of sending it
    +                    session.transfer(flowFile);
    +                    continue;
    +                }
    +
    +                final byte[] messageKey = getMessageKey(flowFile, context);
    +                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +                final byte[] demarcatorBytes;
    +                if (useDemarcator) {
    +                    demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
    +                } else {
    +                    demarcatorBytes = null;
    +                }
    +
    +                session.read(flowFile, new InputStreamCallback() {
    +                    @Override
    +                    public void process(final InputStream rawIn) throws IOException {
    +                        try (final InputStream in = new BufferedInputStream(rawIn)) {
    +                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
    +                        }
    +                    }
    +                });
                 }
    -        });
     
    -        FlowFile resultFile = publishResultRef.get().isAllAcked()
    -                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
    -                : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context));
    +            // Complete the send
    +            final PublishResult publishResult = lease.complete();
     
    -        if (!this.isFailedFlowFile(resultFile)) {
    -            resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent()));
    -        }
    -        return resultFile;
    -    }
    +            // Transfer any successful FlowFiles.
    +            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
    +            for (FlowFile success : publishResult.getSuccessfulFlowFiles()) {
    +                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
     
    -    /**
    -     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
    -     * {@link PublishingContext} contains all contextual information required by
    -     * {@link KafkaPublisher} to publish to Kafka. Such information contains
    -     * things like topic name, content stream, delimiter, key and last ACKed
    -     * message for cases where provided FlowFile is being retried (failed in the
    -     * past).
    -     * <br>
    -     * For the clean FlowFile (file that has been sent for the first time),
    -     * PublishingContext will be built form {@link ProcessContext} associated
    -     * with this invocation.
    -     * <br>
    -     * For the failed FlowFile, {@link PublishingContext} will be built from
    -     * attributes of that FlowFile which by then will already contain required
    -     * information (e.g., topic, key, delimiter etc.). This is required to
    -     * ensure the affinity of the retry in the even where processor
    -     * configuration has changed. However keep in mind that failed FlowFile is
    -     * only considered a failed FlowFile if it is being re-processed by the same
    -     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
    -     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to
    -     * another PublishKafka0_10 processor it is treated as a fresh FlowFile
    -     * regardless if it has #FAILED* attributes set.
    -     */
    -    private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
    -        final byte[] keyBytes = getMessageKey(flowFile, context);
    -
    -        final String topicName;
    -        final byte[] delimiterBytes;
    -        int lastAckedMessageIndex = -1;
    -        if (this.isFailedFlowFile(flowFile)) {
    -            lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
    -            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
    -            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
    -                    ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
    -        } else {
    -            topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -            delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
    -                    .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
    -        }
    +                final int msgCount = publishResult.getSuccessfulMessageCount(success);
    +                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
    +                session.adjustCounter("Messages Sent", msgCount, true);
    +
    +                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
    +                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
    +                session.transfer(success, REL_SUCCESS);
    +            }
    +
    +            // Transfer any failures.
    +            for (final FlowFile failure : publishResult.getFailedFlowFiles()) {
    +                final int successCount = publishResult.getSuccessfulMessageCount(failure);
    +                if (successCount > 0) {
    +                    getLogger().error("Failed to send all message for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}",
    --- End diff --
    
    Same as other comment in the PublishKafka_0_10


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1097: NIFI-2865: Refactored PublishKafka and PublishKafka...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1097#discussion_r82070115
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java ---
    @@ -249,242 +241,141 @@
     
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return DESCRIPTORS;
    +        return PROPERTIES;
         }
     
         @Override
         protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
             return new PropertyDescriptor.Builder()
    -                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
    -                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
    -                .build();
    +            .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
    +            .name(propertyDescriptorName)
    +            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
    +            .dynamic(true)
    +            .build();
         }
     
         @Override
         protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
             return KafkaProcessorUtils.validateCommonProperties(validationContext);
         }
     
    -    volatile KafkaPublisher kafkaPublisher;
    -
    -    /**
    -     * This thread-safe operation will delegate to
    -     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
    -     * checking and creating (if necessary) Kafka resource which could be either
    -     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
    -     * destroy the underlying Kafka resource upon catching an {@link Exception}
    -     * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
    -     * After Kafka resource is destroyed it will be re-created upon the next
    -     * invocation of this operation essentially providing a self healing
    -     * mechanism to deal with potentially corrupted resource.
    -     * <p>
    -     * Keep in mind that upon catching an exception the state of this processor
    -     * will be set to no longer accept any more tasks, until Kafka resource is
    -     * reset. This means that in a multi-threaded situation currently executing
    -     * tasks will be given a chance to complete while no new tasks will be
    -     * accepted.
    -     *
    -     * @param context context
    -     * @param sessionFactory factory
    -     */
    -    @Override
    -    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    -        if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted.
    -            this.taskCounter.incrementAndGet();
    -            final ProcessSession session = sessionFactory.createSession();
    -            try {
    -                /*
    -                 * We can't be doing double null check here since as a pattern
    -                 * it only works for lazy init but not reset, which is what we
    -                 * are doing here. In fact the first null check is dangerous
    -                 * since 'kafkaPublisher' can become null right after its null
    -                 * check passed causing subsequent NPE.
    -                 */
    -                synchronized (this) {
    -                    if (this.kafkaPublisher == null) {
    -                        this.kafkaPublisher = this.buildKafkaResource(context, session);
    -                    }
    -                }
    -
    -                /*
    -                 * The 'processed' boolean flag does not imply any failure or success. It simply states that:
    -                 * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated
    -                 * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile
    -                 */
    -                boolean processed = this.rendezvousWithKafka(context, session);
    -                session.commit();
    -                if (!processed) {
    -                    context.yield();
    -                }
    -            } catch (Throwable e) {
    -                this.acceptTask = false;
    -                session.rollback(true);
    -                this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e});
    -            } finally {
    -                synchronized (this) {
    -                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
    -                        this.close();
    -                        this.acceptTask = true;
    -                    }
    -                }
    -            }
    -        } else {
    -            this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
    -            this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
    -            context.yield();
    +    private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
    +        PublisherPool pool = publisherPool;
    +        if (pool != null) {
    +            return pool;
             }
    +
    +        return publisherPool = createPublisherPool(context);
    +    }
    +
    +    protected PublisherPool createPublisherPool(final ProcessContext context) {
    +        final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
    +        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
    +
    +        final Map<String, Object> kafkaProperties = new HashMap<>();
    +        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
    +        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    +        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    +        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
    +
    +        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis);
         }
     
    -    /**
    -     * Will call {@link Closeable#close()} on the target resource after which
    -     * the target resource will be set to null. Should only be called when there
    -     * are no more threads being executed on this processor or when it has been
    -     * verified that only a single thread remains.
    -     *
    -     * @see KafkaPublisher
    -     * @see KafkaConsumer
    -     */
         @OnStopped
    -    public void close() {
    -        try {
    -            if (this.kafkaPublisher != null) {
    -                try {
    -                    this.kafkaPublisher.close();
    -                } catch (Exception e) {
    -                    this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e);
    -                }
    -            }
    -        } finally {
    -            this.kafkaPublisher = null;
    +    public void closePool() {
    +        if (publisherPool != null) {
    +            publisherPool.close();
             }
    +
    +        publisherPool = null;
         }
     
    -    /**
    -     * Will rendezvous with Kafka if {@link ProcessSession} contains
    -     * {@link FlowFile} producing a result {@link FlowFile}.
    -     * <br>
    -     * The result {@link FlowFile} that is successful is then transfered to
    -     * {@link #REL_SUCCESS}
    -     * <br>
    -     * The result {@link FlowFile} that is failed is then transfered to
    -     * {@link #REL_FAILURE}
    -     *
    -     */
    -    protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) {
    -        FlowFile flowFile = session.get();
    -        if (flowFile != null) {
    -            long start = System.nanoTime();
    -            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    -            Relationship relationship = REL_SUCCESS;
    -            if (!this.isFailedFlowFile(flowFile)) {
    -                String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -                long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    -                String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic);
    -                session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration);
    -                this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis",
    -                        new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration});
    -            } else {
    -                relationship = REL_FAILURE;
    -                flowFile = session.penalize(flowFile);
    -            }
    -            session.transfer(flowFile, relationship);
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet();
    +
    +        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
    +        if (flowFiles.isEmpty()) {
    +            return;
             }
    -        return flowFile != null;
    -    }
     
    -    /**
    -     * Builds and instance of {@link KafkaPublisher}.
    -     */
    -    protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) {
    -        final Map<String, String> kafkaProps = new HashMap<>();
    -        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps);
    -        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    -        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    -        kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
    -        this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
    -        final Properties props = new Properties();
    -        props.putAll(kafkaProps);
    -        KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger());
    -        return publisher;
    -    }
    +        final PublisherPool pool = getPublisherPool(context);
    +        if (pool == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
    +        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
    +
    +        final long startTime = System.nanoTime();
    +        try (final PublisherLease lease = pool.obtainPublisher()) {
    +            // Send each FlowFile to Kafka asynchronously.
    +            for (final FlowFile flowFile : flowFiles) {
    +                if (!isScheduled()) {
    +                    // If stopped, re-queue FlowFile instead of sending it
    +                    session.transfer(flowFile);
    +                    continue;
    +                }
     
    -    /**
    -     * Will rendezvous with {@link KafkaPublisher} after building
    -     * {@link PublishingContext} and will produce the resulting
    -     * {@link FlowFile}. The resulting FlowFile contains all required
    -     * information to determine if message publishing originated from the
    -     * provided FlowFile has actually succeeded fully, partially or failed
    -     * completely (see {@link #isFailedFlowFile(FlowFile)}.
    -     */
    -    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
    -        final AtomicReference<KafkaPublisher.KafkaPublisherResult> publishResultRef = new AtomicReference<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(InputStream contentStream) throws IOException {
    -                PublishingContext publishingContext = PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream);
    -                KafkaPublisher.KafkaPublisherResult result = PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext);
    -                publishResultRef.set(result);
    +                final byte[] messageKey = getMessageKey(flowFile, context);
    +                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +                final byte[] demarcatorBytes;
    +                if (useDemarcator) {
    +                    demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
    +                } else {
    +                    demarcatorBytes = null;
    +                }
    +
    +                session.read(flowFile, new InputStreamCallback() {
    +                    @Override
    +                    public void process(final InputStream rawIn) throws IOException {
    +                        try (final InputStream in = new BufferedInputStream(rawIn)) {
    +                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
    +                        }
    +                    }
    +                });
                 }
    -        });
     
    -        FlowFile resultFile = publishResultRef.get().isAllAcked()
    -                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
    -                : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context));
    +            // Complete the send
    +            final PublishResult publishResult = lease.complete();
     
    -        if (!this.isFailedFlowFile(resultFile)) {
    -            resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent()));
    -        }
    -        return resultFile;
    -    }
    +            // Transfer any successful FlowFiles.
    +            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
    +            for (FlowFile success : publishResult.getSuccessfulFlowFiles()) {
    +                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
     
    -    /**
    -     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
    -     * {@link PublishingContext} contains all contextual information required by
    -     * {@link KafkaPublisher} to publish to Kafka. Such information contains
    -     * things like topic name, content stream, delimiter, key and last ACKed
    -     * message for cases where provided FlowFile is being retried (failed in the
    -     * past).
    -     * <br>
    -     * For the clean FlowFile (file that has been sent for the first time),
    -     * PublishingContext will be built form {@link ProcessContext} associated
    -     * with this invocation.
    -     * <br>
    -     * For the failed FlowFile, {@link PublishingContext} will be built from
    -     * attributes of that FlowFile which by then will already contain required
    -     * information (e.g., topic, key, delimiter etc.). This is required to
    -     * ensure the affinity of the retry in the even where processor
    -     * configuration has changed. However keep in mind that failed FlowFile is
    -     * only considered a failed FlowFile if it is being re-processed by the same
    -     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
    -     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to
    -     * another PublishKafka0_10 processor it is treated as a fresh FlowFile
    -     * regardless if it has #FAILED* attributes set.
    -     */
    -    private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
    -        final byte[] keyBytes = getMessageKey(flowFile, context);
    -
    -        final String topicName;
    -        final byte[] delimiterBytes;
    -        int lastAckedMessageIndex = -1;
    -        if (this.isFailedFlowFile(flowFile)) {
    -            lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
    -            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
    -            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
    -                    ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
    -        } else {
    -            topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -            delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
    -                    .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
    -        }
    +                final int msgCount = publishResult.getSuccessfulMessageCount(success);
    +                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
    +                session.adjustCounter("Messages Sent", msgCount, true);
    +
    +                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
    +                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
    +                session.transfer(success, REL_SUCCESS);
    +            }
    +
    +            // Transfer any failures.
    +            for (final FlowFile failure : publishResult.getFailedFlowFiles()) {
    +                final int successCount = publishResult.getSuccessfulMessageCount(failure);
    +                if (successCount > 0) {
    +                    getLogger().error("Failed to send all message for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}",
    --- End diff --
    
    This logging statement looks like it was meant to include successCount since it has 3 parameters, but only 2 passed in. It might also read better if it said "Failed to send some messages for {} to Kafka, {} messages were acknowledged" or something like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1097: NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 t...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/1097
  
    Reviewing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1097: NIFI-2865: Refactored PublishKafka and PublishKafka...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1097#discussion_r82078303
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.stream.io.exception.TokenTooLargeException;
    +import org.apache.nifi.stream.io.util.StreamDemarcator;
    +
    +public class PublisherLease implements Closeable {
    +    private final ComponentLog logger;
    +    private final Producer<byte[], byte[]> producer;
    +    private final int maxMessageSize;
    +    private final long maxAckWaitMillis;
    +    private volatile boolean poisoned = false;
    +
    +    private InFlightMessageTracker tracker;
    +
    +    public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) {
    +        this.producer = producer;
    +        this.maxMessageSize = maxMessageSize;
    +        this.logger = logger;
    +        this.maxAckWaitMillis = maxAckWaitMillis;
    +    }
    +
    +    protected void poison() {
    +        this.poisoned = true;
    +    }
    +
    +    public boolean isPoisoned() {
    +        return poisoned;
    +    }
    +
    +    void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException {
    +        if (tracker == null) {
    --- End diff --
    
    Not sure if this is necessary, but would we want to check if poisoned here and throw an exception if publish is being called after being poisoned?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1097: NIFI-2865: Refactored PublishKafka and PublishKafka...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1097


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---