You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by olegz <gi...@git.apache.org> on 2016/04/19 22:40:16 UTC

[GitHub] nifi pull request: NIFI-1296 implemented new Kafka processors that...

GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/366

    NIFI-1296 implemented new  Kafka processors that leverage Kafka 0.9 API

    NIFI-1296 added failure handling logic to ensure both processors can be reset to their initial state (as if they were just started)
    
    NIFI-1296 added initial lifecycle test for AbstractKafkaProcessor
    
    NIFI-1296 added more tests
    
    NIFI-1296 final polishing

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-1296

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/366.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #366
    
----
commit b552b5fe449e599944b606253b01f3ca42ab1f15
Author: Oleg Zhurakousky <ol...@suitcase.io>
Date:   2016-04-07T11:15:25Z

    NIFI-1296 implemented new  Kafka processors that leverage Kafka 0.9 API
    
    NIFI-1296 added failure handling logic to ensure both processors can be reset to their initial state (as if they were just started)
    
    NIFI-1296 added initial lifecycle test for AbstractKafkaProcessor
    
    NIFI-1296 added more tests
    
    NIFI-1296 final polishing

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219832281
  
    Thanks @pvillard31! Regarding @joewitt's comment, yes it is still an outstanding issue since we need to modify ReflectionUtils to allow the invocation of non-public methods. So feel free to raise JIRA, but I am also interested what @markap14 thinks about it (allowing ReflctionUtils.invoke*** to invoke non-public methods). In any event, the close() now is implemented with required safety in place to ensure idempotency, so not a pressing issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60829220
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    @pvillard31 Ok, can you please try doing the same via ```-D``` on the command line?
    For example;
    ```
    mvn clean installl -Dfile.encoding=UTF-8
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60766260
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java ---
    @@ -0,0 +1,125 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka;
    +
    +import java.io.Closeable;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.exception.ProcessException;
    +
    +/**
    + * Base class for {@link Processor}s to publish and consume messages from Kafka
    + *
    + * @see PutKafka
    + */
    +abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor {
    +
    +
    +    private volatile boolean acceptTask = true;
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +
    +    /**
    +     * @see KafkaPublisher
    +     */
    +    volatile T kafkaResource;
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    +        final ProcessSession session = sessionFactory.createSession();
    +        if (this.acceptTask) {
    +            try {
    +                this.taskCounter.incrementAndGet();
    +                if (this.kafkaResource == null) {
    +                    synchronized (this) {
    +                        if (this.kafkaResource == null) {
    +                            this.kafkaResource = this.buildKafkaResource(context, session);
    +                        }
    +                    }
    +                }
    +
    +                this.rendezvousWithKafka(context, session);
    +                session.commit();
    +            } catch (Throwable e) {
    +                this.acceptTask = false;
    +                this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[] { this, e });
    +                session.rollback(true);
    +            } finally {
    +                this.resetProcessorIfNecessary();
    +            }
    +        } else {
    +            context.yield();
    +        }
    +    }
    +    /**
    +     * Resets the processor to initial state if necessary. The necessary state
    +     * means there are no active task which could only happen if currently
    +     * executing tasks are given a chance to finish while no new tasks are
    +     * accepted (see {@link #acceptTask}
    +     */
    +    private boolean resetProcessorIfNecessary() {
    +        boolean reset = this.taskCounter.decrementAndGet() == 0 && !this.acceptTask;
    +        if (reset) {
    +            this.close();
    +            this.acceptTask = true;
    +        }
    +        return reset;
    +    }
    +
    +    /**
    +     * Will call {@link Closeable#close()} on the target resource after which
    +     * the target resource will be set to null
    +     *
    +     * @see KafkaPublisher
    +     */
    +    @OnStopped
    +    public void close() {
    --- End diff --
    
    @olegz could we reduce the scope here?  The concern I have is that technically this is a check-then-modify scenario and it could be called by both internal processor threads executing resetProcessorIfNecessary and whenever the framework is calling onStopped.  Now, by the way the framework works onStopped should only be called once there are no threads left so technically this should be legit.  Just would not want someone to also call this from other places in the code if we can avoid it.  So recommend we reduce scope if possible and in either case document the fact that this should not be called by others.  My comments are critical as I think practically speaking this is solid but there is a tiny window of concern..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828541
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Consumes messages from Apache Kafka")
    +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" })
    +public class ConsumeKafka extends AbstractKafkaProcessor<KafkaConsumer<byte[], byte[]>> {
    +
    +    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
    +
    +    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
    +
    +    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
    +
    +    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
    +            .name(ConsumerConfig.GROUP_ID_CONFIG)
    +            .displayName("Group ID")
    +            .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
    +            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
    +            .displayName("Offset Reset")
    +            .description("Allows you to manage teh condition when there is no initial offset in Kafka or if the current offset does not exist any "
    +                    + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
    +            .required(true)
    +            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
    +            .defaultValue(OFFSET_LATEST.getValue())
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Since KafkaConsumer receives messages in batches, this property allows you to provide a string (interpreted as UTF-8) to use "
    +                    + "for demarcating apart multiple Kafka messages when building a single FlowFile. If not specified, all messages received form Kafka "
    +                            + "will be merged into a single content within a FlowFile. By default it will use 'new line' charcater to demarcate individual messages."
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .defaultValue("\n")
    +            .build();
    +
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +    private volatile byte[] demarcatorBytes;
    +
    +    private volatile String topic;
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(GROUP_ID);
    +        _descriptors.add(AUTO_OFFSET_RESET);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        relationships = Collections.unmodifiableSet(sharedRelationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do
    +     * the rest.
    +     */
    +    @Override
    +    @OnStopped
    +    public void close() {
    +        if (this.kafkaResource != null) {
    +            try {
    +                this.kafkaResource.unsubscribe();
    +            } finally { // in the event the above fails
    +                super.close();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka by performing the following:
    +     * <br>
    +     * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a
    +     * non-blocking manner, signaling yield if no records were received from
    +     * Kafka
    +     * <br>
    +     * - if records were received form Kafka, the are written to a newly created
    +     * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see
    +     * {@link #MESSAGE_DEMARCATOR}
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        ConsumerRecords<byte[], byte[]> consumedRecords = this.kafkaResource.poll(100);
    +        if (consumedRecords != null && !consumedRecords.isEmpty()) {
    +            long start = System.nanoTime();
    +            FlowFile flowFile = processSession.create();
    +            final AtomicInteger messageCounter = new AtomicInteger();
    +
    +            for (final ConsumerRecord<byte[], byte[]> consumedRecord : consumedRecords) {
    +                flowFile = processSession.append(flowFile, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        if (messageCounter.getAndIncrement() > 0) {
    +                            out.write(ConsumeKafka.this.demarcatorBytes);
    +                        }
    +                        out.write(consumedRecord.value());
    +                    }
    +                });
    +            }
    +
    +            this.releaseFlowFile(flowFile, processSession, flowFile.getAttributes(), start, messageCounter.get() - 1);
    --- End diff --
    
    When testing in a simple case (no demarcator), receive provenance events are always showing "Received 0 Kafka messages".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219837303
  
    @andrewmlim yep some of it has already been addressed, but I'd stay away from munging yet another JIRA to this one. So good point and will be reviewed and addressed once this one is merged. Do you guys agree?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828896
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Consumes messages from Apache Kafka")
    +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" })
    +public class ConsumeKafka extends AbstractKafkaProcessor<KafkaConsumer<byte[], byte[]>> {
    +
    +    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
    +
    +    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
    +
    +    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
    +
    +    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
    +            .name(ConsumerConfig.GROUP_ID_CONFIG)
    +            .displayName("Group ID")
    +            .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
    +            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
    +            .displayName("Offset Reset")
    +            .description("Allows you to manage teh condition when there is no initial offset in Kafka or if the current offset does not exist any "
    +                    + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
    +            .required(true)
    +            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
    +            .defaultValue(OFFSET_LATEST.getValue())
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Since KafkaConsumer receives messages in batches, this property allows you to provide a string (interpreted as UTF-8) to use "
    +                    + "for demarcating apart multiple Kafka messages when building a single FlowFile. If not specified, all messages received form Kafka "
    +                            + "will be merged into a single content within a FlowFile. By default it will use 'new line' charcater to demarcate individual messages."
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .defaultValue("\n")
    +            .build();
    +
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +    private volatile byte[] demarcatorBytes;
    +
    +    private volatile String topic;
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(GROUP_ID);
    +        _descriptors.add(AUTO_OFFSET_RESET);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        relationships = Collections.unmodifiableSet(sharedRelationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do
    +     * the rest.
    +     */
    +    @Override
    +    @OnStopped
    +    public void close() {
    +        if (this.kafkaResource != null) {
    +            try {
    +                this.kafkaResource.unsubscribe();
    +            } finally { // in the event the above fails
    +                super.close();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka by performing the following:
    +     * <br>
    +     * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a
    +     * non-blocking manner, signaling yield if no records were received from
    +     * Kafka
    +     * <br>
    +     * - if records were received form Kafka, the are written to a newly created
    +     * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see
    +     * {@link #MESSAGE_DEMARCATOR}
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        ConsumerRecords<byte[], byte[]> consumedRecords = this.kafkaResource.poll(100);
    +        if (consumedRecords != null && !consumedRecords.isEmpty()) {
    +            long start = System.nanoTime();
    +            FlowFile flowFile = processSession.create();
    +            final AtomicInteger messageCounter = new AtomicInteger();
    +
    +            for (final ConsumerRecord<byte[], byte[]> consumedRecord : consumedRecords) {
    +                flowFile = processSession.append(flowFile, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        if (messageCounter.getAndIncrement() > 0) {
    +                            out.write(ConsumeKafka.this.demarcatorBytes);
    +                        }
    +                        out.write(consumedRecord.value());
    +                    }
    +                });
    +            }
    +
    +            this.releaseFlowFile(flowFile, processSession, flowFile.getAttributes(), start, messageCounter.get() - 1);
    --- End diff --
    
    Good catch as well. Some leftovers after refactoring something else ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60792649
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---
    @@ -78,152 +81,182 @@
         }
     
         /**
    -     *
    -     */
    -    void setProcessLog(ProcessorLog processLog) {
    -        this.processLog = processLog;
    -    }
    -
    -    /**
    -     * Publishes messages to Kafka topic. It supports three publishing
    -     * mechanisms.
    +     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
    +     * determine how many messages to Kafka will be sent from a provided
    +     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
    +     * It supports two publishing modes:
          * <ul>
    -     * <li>Sending the entire content stream as a single Kafka message.</li>
    -     * <li>Splitting the incoming content stream into chunks and sending
    -     * individual chunks as separate Kafka messages.</li>
    -     * <li>Splitting the incoming content stream into chunks and sending only
    -     * the chunks that have failed previously @see
    -     * {@link SplittableMessageContext#getFailedSegments()}.</li>
    +     * <li>Sending all messages constructed from
    +     * {@link StreamDemarcator#nextToken()} operation.</li>
    +     * <li>Sending only unacknowledged messages constructed from
    +     * {@link StreamDemarcator#nextToken()} operation.</li>
          * </ul>
    +     * The unacknowledged messages are determined from the value of
    +     * {@link PublishingContext#getLastAckedMessageIndex()}.
    +     * <br>
          * This method assumes content stream affinity where it is expected that the
          * content stream that represents the same Kafka message(s) will remain the
          * same across possible retries. This is required specifically for cases
          * where delimiter is used and a single content stream may represent
    -     * multiple Kafka messages. The failed segment list will keep the index of
    -     * of each content stream segment that had failed to be sent to Kafka, so
    -     * upon retry only the failed segments are sent.
    +     * multiple Kafka messages. The
    +     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
    +     * index of the last ACKed message, so upon retry only messages with the
    +     * higher index are sent.
          *
    -     * @param messageContext
    -     *            instance of {@link SplittableMessageContext} which hold
    -     *            context information about the message to be sent
    -     * @param contentStream
    -     *            instance of open {@link InputStream} carrying the content of
    -     *            the message(s) to be send to Kafka
    -     * @param partitionKey
    -     *            the value of the partition key. Only relevant is user wishes
    -     *            to provide a custom partition key instead of relying on
    -     *            variety of provided {@link Partitioner}(s)
    -     * @param maxBufferSize maximum message size
    -     * @return The set containing the failed segment indexes for messages that
    -     *         failed to be sent to Kafka.
    +     * @param publishingContext
    +     *            instance of {@link PublishingContext} which hold context
    +     *            information about the message(s) to be sent.
    +     * @return The index of the last successful offset.
          */
    -    BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
    -            int maxBufferSize) {
    -        List<Future<RecordMetadata>> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize);
    -        return this.publish(sendFutures);
    +    KafkaPublisherResult publish(PublishingContext publishingContext) {
    +        StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
    +                publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
    +
    +        int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
    +        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
    +
    +        byte[] messageBytes;
    +        int tokenCounter = 0;
    +        for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
    +            if (prevLastAckedMessageIndex < tokenCounter) {
    +                Integer partitionId = publishingContext.getPartitionId();
    +                if (partitionId == null && publishingContext.getKeyBytes() != null) {
    +                    partitionId = this.getPartition(publishingContext.getKeyBytes(), publishingContext.getTopic());
    +                }
    +                ProducerRecord<byte[], byte[]> message =
    +                        new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes);
    +                resultFutures.add(this.kafkaProducer.send(message));
    +            }
    +        }
    +
    +        tokenCounter -= 1;
    --- End diff --
    
    this decrement then increment appears to be left over from some previous or incomplete logic.  Probably can remove this line and drop the increment that follows, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by andrewmlim <gi...@git.apache.org>.
Github user andrewmlim commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219545549
  
    For the \u201cClient ID\u201d property help:   \u201cCorrespons\u201d should be changed to \u201cCorresponds\u201d.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219106272
  
    @joewitt the OOM tests are still here and running. They were not @Ignored. I've adjusted them a bit to play nice with the overall build. The @Ignored tests are only affecting the once that run with embedded Kafka which we constantly have to start/stop to run each test in isolation and that takes time. The system.out calls are *only* coming from one test class (3 test cases) and I kept them there for a reason, primarily to help with review process where values can be visualized by the reviewer. I do intend to remove them in the future after all are comfortable with the approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60829110
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    @olegz Doesn't change the result on my side. Note that the tests are OK within eclipse but this test fails when running maven build (Windows 10). I also tried:
    ````java
            System.setProperty("file.encoding", "UTF-8");
            System.setProperty("user.language", "en");
    ````
    but no luck.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828729
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    @pvillard31 Great catch! Need to change line:188 to look like this
    ```
    String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah");
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60837830
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
    +import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner;
    +
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
    +        + "user-specified delimiter, such as a new-line.")
    +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
    +                 description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
    +        + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
    +        + " overriden with warning message describing the override."
    +        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
    +public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
    +
    +    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
    +
    +    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
    +
    +    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
    +
    +    protected static final String FAILED_KEY_ATTR = "failed.key";
    +
    +    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
    +
    +    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
    +            "FlowFile will be routed to failure unless the message is replicated to the appropriate "
    +                    + "number of Kafka Nodes according to the Topic configuration");
    +    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
    +            "FlowFile will be routed to success if the message is received by a single Kafka node, "
    +                    + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
    +                    + "but can result in data loss if a Kafka node crashes");
    +    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
    +            "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
    +                    + "without waiting for a response. This provides the best performance but may result in data loss.");
    +
    +
    +    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(RoundRobinPartitioner.class.getName(),
    +            RoundRobinPartitioner.class.getSimpleName(),
    +            "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
    +                    + "the next Partition to Partition 2, and so on, wrapping as necessary.");
    +    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
    +            "DefaultPartitioner", "Messages will be assigned to random partitions.");
    +
    +    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.ACKS_CONFIG)
    +            .displayName("Delivery Guarantee")
    +            .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
    +            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
    +            .build();
    +    static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
    +            .displayName("Meta Data Wait Time")
    +            .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the "
    +                            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("30 sec")
    +            .build();
    +    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
    +            .name("kafka-key")
    +            .displayName("Kafka Key")
    +            .description("The Key to use for the Message")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Specifies the string (interpreted as UTF-8) to use for demarcating apart multiple messages within "
    +                    + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
    +                            + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .build();
    +    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
    +            .displayName("Partitioner class")
    +            .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
    +            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
    +            .defaultValue(RANDOM_PARTITIONING.getValue())
    +            .required(false)
    +            .build();
    +    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
    +            .displayName("Compression Type")
    +            .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues("none", "gzip", "snappy", "lz4")
    +            .defaultValue("none")
    +            .build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
    +            .build();
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(DELIVERY_GUARANTEE);
    +        _descriptors.add(KEY);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        _descriptors.add(META_WAIT_TIME);
    +        _descriptors.add(PARTITION_CLASS);
    +        _descriptors.add(COMPRESSION_CODEC);
    +
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.addAll(sharedRelationships);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
    +     * producing a result {@link FlowFile}.
    +     * <br>
    +     * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS}
    +     * <br>
    +     * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE}
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    +            if (!this.isFailedFlowFile(flowFile)) {
    +                session.getProvenanceReporter().send(flowFile, context.getProperty(BOOTSTRAP_SERVERS)
    --- End diff --
    
    i agree that the list pierre provided for provenance events is good to have.  'kafka' is good to have as the scheme.  I do see how the multiple broker urls can be problematic but I believe kafka does expose which broker we talked to.  If not then we can use the first one but i'm pretty sure it is available.  This is important information for provenance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-213925464
  
    @olegz @joewitt I updated my last comment to reflect the discussion I had with Oleg regarding the task counter: there is no issue, it was my mistake during debug investigation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60829192
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
    +import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner;
    +
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
    +        + "user-specified delimiter, such as a new-line.")
    +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
    +                 description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
    +        + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
    +        + " overriden with warning message describing the override."
    +        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
    +public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
    +
    +    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
    +
    +    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
    +
    +    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
    +
    +    protected static final String FAILED_KEY_ATTR = "failed.key";
    +
    +    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
    +
    +    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
    +            "FlowFile will be routed to failure unless the message is replicated to the appropriate "
    +                    + "number of Kafka Nodes according to the Topic configuration");
    +    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
    +            "FlowFile will be routed to success if the message is received by a single Kafka node, "
    +                    + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
    +                    + "but can result in data loss if a Kafka node crashes");
    +    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
    +            "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
    +                    + "without waiting for a response. This provides the best performance but may result in data loss.");
    +
    +
    +    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(RoundRobinPartitioner.class.getName(),
    +            RoundRobinPartitioner.class.getSimpleName(),
    +            "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
    +                    + "the next Partition to Partition 2, and so on, wrapping as necessary.");
    +    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
    +            "DefaultPartitioner", "Messages will be assigned to random partitions.");
    +
    +    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.ACKS_CONFIG)
    +            .displayName("Delivery Guarantee")
    +            .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
    +            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
    +            .build();
    +    static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
    +            .displayName("Meta Data Wait Time")
    +            .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the "
    +                            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("30 sec")
    +            .build();
    +    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
    +            .name("kafka-key")
    +            .displayName("Kafka Key")
    +            .description("The Key to use for the Message")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Specifies the string (interpreted as UTF-8) to use for demarcating apart multiple messages within "
    +                    + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
    +                            + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .build();
    +    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
    +            .displayName("Partitioner class")
    +            .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
    +            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
    +            .defaultValue(RANDOM_PARTITIONING.getValue())
    +            .required(false)
    +            .build();
    +    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
    +            .displayName("Compression Type")
    +            .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues("none", "gzip", "snappy", "lz4")
    +            .defaultValue("none")
    +            .build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
    +            .build();
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(DELIVERY_GUARANTEE);
    +        _descriptors.add(KEY);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        _descriptors.add(META_WAIT_TIME);
    +        _descriptors.add(PARTITION_CLASS);
    +        _descriptors.add(COMPRESSION_CODEC);
    +
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.addAll(sharedRelationships);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
    +     * producing a result {@link FlowFile}.
    +     * <br>
    +     * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS}
    +     * <br>
    +     * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE}
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    +            if (!this.isFailedFlowFile(flowFile)) {
    +                session.getProvenanceReporter().send(flowFile, context.getProperty(BOOTSTRAP_SERVERS)
    --- End diff --
    
    Maybe consider a SEND provenance event as suggested in NIFI-1672:
    - add prefix kafka:// in URI
    - add event duration
    - number of published messages in details


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828756
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    @pvillard31 actually it would be nice if you can make the above change locally and confirm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219842672
  
    i certainly agree.  this thing has to be one of the most reviewed PRs of all time.  +1 please do merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-218763305
  
    applying the patch version of this fails.
    merging this branch fails.
      uto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/zookeeper.properties
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/zookeeper.properties
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/server.properties
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/server.properties
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE
    Auto-merging nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/LICENSE
    CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/LICENSE
    Automatic merge failed; fix conflicts and then commit the result.
    
    The logs for travis-ci show an unused import (probably fixed by now).  Please update patch to resolve merge conflicts and I'll try again.  Also, likely worthwhile to have an 0.x and a master version of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60797930
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    I see this fairly often.
    
    
    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.811 sec <<< FAILURE! - in org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    validateLifecycleCorrectnessWithProcessingFailures(org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest)  Time elapsed: 0.694 sec  <<< FAILURE!
    java.lang.AssertionError: expected:<10000> but was:<9999>
    	at org.junit.Assert.fail(Assert.java:88)
    	at org.junit.Assert.failNotEquals(Assert.java:834)
    	at org.junit.Assert.assertEquals(Assert.java:645)
    	at org.junit.Assert.assertEquals(Assert.java:631)
    	at org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest.validateLifecycleCorrectnessWithProcessingFailures(AbstractKafkaProcessorLifecycelTest.java:221)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60829511
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    It seems that -D< property >=< value > where property contains "." is not allowed. I tried with -Dencoding=UTF-8 and no change. Then I found:
    http://stackoverflow.com/questions/17656475/maven-source-encoding-in-utf-8-not-working
    
    I added in nifi-kafka-pubsub-processors\pom.xml
    ````xml
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-surefire-plugin</artifactId>
    				<configuration>
    					<argLine>-Dfile.encoding=UTF-8</argLine>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    ````
    
    And it is now OK. Maybe this should be placed in a parent pom file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828026
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    @olegz @joewitt I do see the same test failure on my side and also the one below:
    
    ````
    Running org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest
    Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 52.39 sec <<< FAILURE! - in org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest
    validateGetAllMessagesWithProvidedDemarcator(org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest)  Time elapsed: 1.112 sec  <<< FAILURE!
    org.junit.ComparisonFailure: expected:<[????????????]-3> but was:<[????????????]-3>
    	at org.junit.Assert.assertEquals(Assert.java:115)
    	at org.junit.Assert.assertEquals(Assert.java:144)
    	at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest.validateGetAllMessagesWithProvidedDemarcator(ConsumeKafkaTest.java:191)
    ````
    But maybe something is needed on my side to support Russian and Japanese characters?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828173
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.Closeable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.regex.Pattern;
    +
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyDescriptor.Builder;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.FormatUtils;
    +
    +/**
    + * Base class for {@link Processor}s to publish and consume messages from Kafka
    + *
    + * @see PublishKafka
    + * @see ConsumeKafka
    + */
    +abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor {
    +
    +    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
    +
    +    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
    +
    +
    +    static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
    +    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
    +    static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
    +    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
    +
    +    static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
    +            .displayName("Kafka Brokers")
    +            .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
    +            .expressionLanguageSupported(true)
    +            .defaultValue("localhost:9092")
    +            .build();
    +    static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.CLIENT_ID_CONFIG)
    +            .displayName("Client ID")
    +            .description("String value uniquely identiofying this client application. Correspons to Kafka's 'client.id' property.")
    --- End diff --
    
    Some typos in the description. I also noticed some in ConsumeKafka with AUTO_OFFSET_RESET and MESSAGE_DEMARCATOR properties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by andrewmlim <gi...@git.apache.org>.
Github user andrewmlim commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219836475
  
    @pvillard31 @olegz Just wanted to let you both know that during the demo of these new Kafka processors, it was suggested that we also update the Put and Get Kafka processor descriptions.  I filed a ticket for that:  https://issues.apache.org/jira/browse/NIFI-1889


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-216842191
  
    I just played a little bit with the processors and here are some remarks:
    
    - Cosmetic remarks
    1. add the duration in provenance events
    2. add the number of messages sent/received (useful when batching)
    
    - Testing
    1. Simple case
    When I have a topic with one broker and one partition. It seems to be OK: no more data loss, demarcator seems to be used correctly. LGTM.
    
    2. I then deleted my topic and recreated it with 5 partitions. I am sending flow files with 2 JSON strings (one by line). The PublishKafka is set with appropriate demarcator, plaintext, guarantee replicated delivery, round robin partitioner and snappy compression. All messages are correctly sent to Kafka. The ConsumeKafka has no particular settings (no demarcator). The thing is I do not retrieve all the published messages. For each FlowFile, each JSON contains the date, I can confirm that the very first flow file is correctly published and consumed (two messages), then I have 3 FlowFiles (not consecutive) for which 2 messages are published but only one is consumed.
    
    I don't know it that helps a lot. I'll try to look closer later in the day.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60830637
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    Sure, see [NIFI-1806](https://issues.apache.org/jira/browse/NIFI-1806).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/366


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-213764931
  
    I think there is an issue on Consumer side. Here is my test case:
    GenerateFlowFile -> ReplaceText -> PublishKafka
    ConsuleKafka -> UpdateAttribute -> PutFile
    The update attribute processor is to change the filename with uuid to ensure a file is correctly created for each message consumed.
    
    I start all my processors, and after a short moment I only stop the GenerateFlowFile. What I observe is a number X of messages published (confirmed by checking Kafka topic) and a lower number Y of messages consumed. However, the last message published is consumed (I checked the created file). So it seems that some messages are simply skipped.
    
    Then I started only the publish part to have 16 messages published. Stopped the publish part and started the consuming part. I consumed only one message and the number of tasks on ConsumeKafka processor is continually growing (I stopped everything when having 200+ tasks indicated on the processor).
    
    Went in debug, and:
    ````java
    ConsumerRecords<byte[], byte[]> consumedRecords = this.kafkaResource.poll(100);
    ````
    is empty.
    
    Besides:
    ````java
    boolean reset = this.taskCounter.decrementAndGet() == 0 && !this.acceptTask;
    ````
    returns false because this.taskCounter.decrementAndGet() = -1
    
    I think it explains the growing number of tasks. But not why some messages are skipped in the process. Will try to figure out why this happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-213736152
  
    I am going to test the processors with a local installation of Kafka.
    A quick question though. Are we going to keep 4 processors for Kafka?
    If yes, I think we should add additional information in ````@CapabilityDescription```` to help users understand the differences between GetKafka/ConsumeKafka and PutKafka/PublishKafka. As it is now, it can be confusing for users regarding which processor they should use. If we are going to remove Get/Put and use Consume/Publish instead, you can ignore my comment...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-216852394
  
    @pvillard31 Thanks for playing with it. Quick question. I don't quite understand the second point.So let me try to paraphrase it until I stumble:
    1. You say PublishKafka publishes correctly. I am assuming you were able to verify that all messages that you have published were published. Correct?
    2. The fist FlowFile you published with two messages is consumed by ConsumeKafka correctly. Correct?. The second FlowFile published . . . that is where I am lost in translation ;)
    
    Just trying to understand a scenario so i can may be add a test for it.
    
    Cheers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219830951
  
    Reviewed and tested. LGTM. All tested flows were OK.
    
    Few minor remarks:
    - I'd update @CapabilityDescription to help users differentiate Publish/Put and Consume/Get. As it stands now they will possibly wonder why we have 4 processors to deal with Kafka when listing processors from the UI.
    - There is a comment from @joewitt regarding the scope of close() in AbstractKafkaProcessor. Is it still an open issue?
    
    Otherwise I'm a +1, great job!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-218793849
  
    Tests run: 0, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 441.183 sec - in org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest
    Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60829253
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
    +import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner;
    +
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
    +        + "user-specified delimiter, such as a new-line.")
    +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
    +                 description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
    +        + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
    +        + " overriden with warning message describing the override."
    +        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
    +public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
    +
    +    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
    +
    +    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
    +
    +    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
    +
    +    protected static final String FAILED_KEY_ATTR = "failed.key";
    +
    +    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
    +
    +    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
    +            "FlowFile will be routed to failure unless the message is replicated to the appropriate "
    +                    + "number of Kafka Nodes according to the Topic configuration");
    +    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
    +            "FlowFile will be routed to success if the message is received by a single Kafka node, "
    +                    + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
    +                    + "but can result in data loss if a Kafka node crashes");
    +    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
    +            "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
    +                    + "without waiting for a response. This provides the best performance but may result in data loss.");
    +
    +
    +    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(RoundRobinPartitioner.class.getName(),
    +            RoundRobinPartitioner.class.getSimpleName(),
    +            "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
    +                    + "the next Partition to Partition 2, and so on, wrapping as necessary.");
    +    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
    +            "DefaultPartitioner", "Messages will be assigned to random partitions.");
    +
    +    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.ACKS_CONFIG)
    +            .displayName("Delivery Guarantee")
    +            .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
    +            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
    +            .build();
    +    static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
    +            .displayName("Meta Data Wait Time")
    +            .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the "
    +                            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("30 sec")
    +            .build();
    +    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
    +            .name("kafka-key")
    +            .displayName("Kafka Key")
    +            .description("The Key to use for the Message")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Specifies the string (interpreted as UTF-8) to use for demarcating apart multiple messages within "
    +                    + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
    +                            + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .build();
    +    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
    +            .displayName("Partitioner class")
    +            .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
    +            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
    +            .defaultValue(RANDOM_PARTITIONING.getValue())
    +            .required(false)
    +            .build();
    +    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
    +            .displayName("Compression Type")
    +            .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues("none", "gzip", "snappy", "lz4")
    +            .defaultValue("none")
    +            .build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
    +            .build();
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(DELIVERY_GUARANTEE);
    +        _descriptors.add(KEY);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        _descriptors.add(META_WAIT_TIME);
    +        _descriptors.add(PARTITION_CLASS);
    +        _descriptors.add(COMPRESSION_CODEC);
    +
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.addAll(sharedRelationships);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
    +     * producing a result {@link FlowFile}.
    +     * <br>
    +     * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS}
    +     * <br>
    +     * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE}
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    +            if (!this.isFailedFlowFile(flowFile)) {
    +                session.getProvenanceReporter().send(flowFile, context.getProperty(BOOTSTRAP_SERVERS)
    --- End diff --
    
    Good point, although I am removing ```kafka://`` (both consume and publish)`. It is not a real protocol and it can get confusing when multiple broker URLs are provided


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-216917150
  
    @olegz No problem ;) I was quite in a hurry and I understand why it is not clear at all.
    
    You are correct. By dumping topic content with Kafka CLI, I can confirm that all messages are correctly published. However on 24 messages published (12 FlowFiles), only 21 have been consumed. The 3 missing messages are originating from 3 different FlowFiles. And I noticed that it was not from the first FlowFile published and it was not 3 consecutive flow files (I wanted to check the order just to be sure, it was not related to the deletion / creation of the topic).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-218787020
  
    [INFO] reporting-task.css (2512b) -> reporting-task.css (1264b)[50%] -> reporting-task.css.gz (488b)[19%]
    Tests run: 5, Failures: 2, Errors: 0, Skipped: 1, Time elapsed: 81.054 sec <<< FAILURE! - in org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    validateConcurrencyWithAllFailures(org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest)  Time elapsed: 29.154 sec  <<< FAILURE!
    java.lang.AssertionError: null
    	at org.junit.Assert.fail(Assert.java:86)
    	at org.junit.Assert.assertTrue(Assert.java:41)
    	at org.junit.Assert.assertTrue(Assert.java:52)
    	at org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest.validateConcurrencyWithAllFailures(AbstractKafkaProcessorLifecycelTest.java:369)
    
    validateConcurrencyWithAllSuccesses(org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest)  Time elapsed: 33.284 sec  <<< FAILURE!
    java.lang.AssertionError: null
    	at org.junit.Assert.fail(Assert.java:86)
    	at org.junit.Assert.assertTrue(Assert.java:41)
    	at org.junit.Assert.assertTrue(Assert.java:52)
    	at org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest.validateConcurrencyWithAllSuccesses(AbstractKafkaProcessorLifecycelTest.java:291)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680 implemented new Kafka proc...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-212462519
  
    @pvillard31 would be nice (if/when you get time) for you to test and review this one. Basically we want to try to merge this one as quick as possible so community get's a chance to play with it before 0.7 is out so we can address whatever may come up. Keep in mind that this is specifically for 0.9+ brokers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828535
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Consumes messages from Apache Kafka")
    +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" })
    +public class ConsumeKafka extends AbstractKafkaProcessor<KafkaConsumer<byte[], byte[]>> {
    +
    +    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
    +
    +    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
    +
    +    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
    +
    +    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
    +            .name(ConsumerConfig.GROUP_ID_CONFIG)
    +            .displayName("Group ID")
    +            .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
    +            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
    +            .displayName("Offset Reset")
    +            .description("Allows you to manage teh condition when there is no initial offset in Kafka or if the current offset does not exist any "
    +                    + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
    +            .required(true)
    +            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
    +            .defaultValue(OFFSET_LATEST.getValue())
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Since KafkaConsumer receives messages in batches, this property allows you to provide a string (interpreted as UTF-8) to use "
    +                    + "for demarcating apart multiple Kafka messages when building a single FlowFile. If not specified, all messages received form Kafka "
    +                            + "will be merged into a single content within a FlowFile. By default it will use 'new line' charcater to demarcate individual messages."
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .defaultValue("\n")
    +            .build();
    +
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +    private volatile byte[] demarcatorBytes;
    +
    +    private volatile String topic;
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(GROUP_ID);
    +        _descriptors.add(AUTO_OFFSET_RESET);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        relationships = Collections.unmodifiableSet(sharedRelationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do
    +     * the rest.
    +     */
    +    @Override
    +    @OnStopped
    +    public void close() {
    +        if (this.kafkaResource != null) {
    +            try {
    +                this.kafkaResource.unsubscribe();
    +            } finally { // in the event the above fails
    +                super.close();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka by performing the following:
    +     * <br>
    +     * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a
    +     * non-blocking manner, signaling yield if no records were received from
    +     * Kafka
    +     * <br>
    +     * - if records were received form Kafka, the are written to a newly created
    +     * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see
    +     * {@link #MESSAGE_DEMARCATOR}
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        ConsumerRecords<byte[], byte[]> consumedRecords = this.kafkaResource.poll(100);
    +        if (consumedRecords != null && !consumedRecords.isEmpty()) {
    +            long start = System.nanoTime();
    +            FlowFile flowFile = processSession.create();
    +            final AtomicInteger messageCounter = new AtomicInteger();
    +
    +            for (final ConsumerRecord<byte[], byte[]> consumedRecord : consumedRecords) {
    +                flowFile = processSession.append(flowFile, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        if (messageCounter.getAndIncrement() > 0) {
    +                            out.write(ConsumeKafka.this.demarcatorBytes);
    +                        }
    +                        out.write(consumedRecord.value());
    +                    }
    +                });
    +            }
    +
    +            this.releaseFlowFile(flowFile, processSession, flowFile.getAttributes(), start, messageCounter.get() - 1);
    +            this.kafkaResource.commitSync();
    +        } else {
    +            context.yield();
    +        }
    +    }
    +
    +    /**
    +     * Builds and instance of {@link KafkaConsumer} and subscribes to a provided
    +     * topic.
    +     */
    +    @Override
    +    protected KafkaConsumer<byte[], byte[]> buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException {
    +        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
    +        this.topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
    +
    +        Properties kafkaProperties = this.buildKafkaProperties(context);
    +        kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    +        kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    +
    +        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(kafkaProperties);
    +        consumer.subscribe(Collections.singletonList(this.topic));
    +        return consumer;
    +    }
    +
    +    /**
    +     * Will release flow file. Releasing of the flow file in the context of this
    +     * operation implies the following:
    +     *
    +     * If Empty then remove from session and return If has something then
    +     * transfer to {@link #REL_SUCCESS}
    +     */
    +    private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<String, String> attributes, long start, int msgCount) {
    +        flowFile = session.putAllAttributes(flowFile, attributes);
    +        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +        session.getProvenanceReporter().receive(flowFile, "kafka://" + this.topic, "Received " + msgCount + " Kafka messages", millis);
    --- End diff --
    
    I think IP bootstrap servers IPs is/are missing if the URI: kafka://<IPs/Ports>/topic, no?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-213767740
  
    Actually @pvillard31 this is yet another great observation as @joewitt and I have seen something similar during the intermittent failures of ```PublishKafkaTest.validateDemarcationIntoEmptyMessages``` (missing messages).  I also remember some chatter about similar things on Kafka mailing list specifically related to new API. I'll work on it as well as tidy up lifecycle as it seems yo be an issue for empty polls.
    That said, there is also 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60830161
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception {
    --- End diff --
    
    @pvillard31 arghhhhh! such a standard property notation ;) Anyway, I do agree that it has to be added to parent POM, although I'd argue that it has to go into the most parent POM as this will be affecting more components as we are making our tests more comprehensive . Would you mind raising the JIRA for it and point to this discussion? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680 implemented new Kafka proc...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-212463926
  
    @olegz No problem. Currently in London for a training, will try to test/review it on Friday.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-219099819
  
    Am able to build now after rebasing this on top of master.  However, the ignored tests weren't just long running they were also unreliable as shown in the previous OutOfMemoryError and the preceding lifecycletest issue.  They should be made faster/more discrete/reliable.  Lots of System.out calls.  Those should use the proper logger or be removed.  There is an ignored test 'for sanity'.  Fix or remove.
    
    This is closer but needs those items addressed before merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIF...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/366#issuecomment-216890092
  
    As far as cosmetic remarks, all addressed will push soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---