You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by olegz <gi...@git.apache.org> on 2016/02/01 20:57:51 UTC

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/200

    NIFI-865 added initial support for AMQP publish/subscribe

    added initial documentation and testing. More tesing will be added

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-865

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #200
    
----
commit 0fb8f008a8469d8e05864a7d95ad11f1a18ac0e1
Author: Oleg Zhurakousky <ol...@suitcase.io>
Date:   2016-01-31T18:20:14Z

    NIFI-865 added initial support for AMQP publish/subscribe
    added initial documentation and testing. More tesing will be added

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178325971
  
    Had checkstyle violations for unused imports:
    
    > [INFO] --- maven-checkstyle-plugin:2.15:check (check-style) @ nifi-amqp-processors ---
    [WARNING] src/test/java/org/apache/nifi/amqp/processors/TestConnection.java[21:8] (imports) UnusedImports: Unused import - java.net.InetSocketAddress.
    [WARNING] src/test/java/org/apache/nifi/amqp/processors/TestConnection.java[22:8] (imports) UnusedImports: Unused import - java.net.InterfaceAddress.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51594237
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +import com.rabbitmq.client.GetResponse;
    +
    +/**
    + * Consuming AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
    + * {@link FlowFile} containing the body of the consumed AMQP message and AMQP
    + * properties that came with message which are added to a {@link FlowFile} as
    + * attributes.
    + */
    +@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("Queue")
    +            .description("Source queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    private final static Set<Relationship> relationships;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(QUEUE);
    +        _propertyDescriptors.addAll(descriptors);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     * Will construct a {@link FlowFile} containing the body of the consumed
    +     * AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
    +     * not null) and AMQP properties that came with message which are added to a
    +     * {@link FlowFile} as attributes, transferring {@link FlowFile} to
    +     * 'success' {@link Relationship}.
    +     */
    +    @Override
    +    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        final GetResponse response = this.targetResource.consume();
    +        if (response != null){
    +            FlowFile flowFile = processSession.create();
    +            flowFile = processSession.append(flowFile, new OutputStreamCallback() {
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178925056
  
    @bbende doing it now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600649
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +
    +/**
    + * Base class for implementing publishing and consuming AMQP workers.
    + *
    + * @see AMQPPublisher
    + * @see AMQPConsumer
    + */
    +abstract class AMQPWorker implements AutoCloseable {
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
    +
    +    protected final Channel channel;
    +
    +    /**
    +     * Creates an instance of this worker initializing it with AMQP
    +     * {@link Connection} and creating a target {@link Channel} used by
    +     * sub-classes to interruct with AMQP-based messaging system.
    +     *
    +     * @param connection
    +     *            instance of {@link Connection}
    +     */
    +    public AMQPWorker(Connection connection) {
    +        this.validateConnection(connection);
    +        try {
    +            this.channel = connection.createChannel();
    +        } catch (IOException e) {
    +            logger.error("Failed to create Channel for " + connection, e);
    +            throw new IllegalStateException(e);
    +        }
    +    }
    +
    +    /**
    +     * Closes {@link Channel} created when instance of this class was created.
    +     */
    +    @Override
    +    public void close() throws Exception {
    --- End diff --
    
    channel.close() will only ever throw IOException (at least that's the only checked exception that it will throw) - should close() declare to throw only IOException instead of the general Exception:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51598556
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    +            .name("Exchange Name")
    +            .description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
    +                    + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor ROUNTING_KEY = new PropertyDescriptor.Builder()
    --- End diff --
    
    Typo in the variable name here - ROUNTING instead of ROUTING


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600077
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java ---
    @@ -0,0 +1,204 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ConnectionFactory;
    +
    +/**
    + * Base processor that uses RabbitMQ client API
    + * (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
    + * messaging systems version 0.9.1
    + *
    + * @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
    + *            and {@link AMQPConsumer}
    + */
    +abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
    +            .name("Host Name")
    +            .description("Network address of AMQP broker (e.g., localhost)")
    +            .required(true)
    +            .defaultValue("localhost")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
    +            .name("Port")
    +            .description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
    +            .required(true)
    +            .defaultValue("5672")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
    +            .name("Virtual Host")
    +            .description("Virtual Host name which segregates AMQP system for enhanced security.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
    +            .name("User Name")
    +            .description("User Name used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .sensitive(true)
    +            .build();
    +    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder()
    +            .name("AMQP Version")
    +            .description("AMQP Version. Currently only supports AMQP v0.9.1.")
    +            .required(true)
    +            .allowableValues("0.9.1")
    +            .defaultValue("0.9.1")
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times
    +     */
    +    static {
    +        Field[] fields = AbstractAMQPProcessor.class.getDeclaredFields();
    +        try {
    +            for (Field field : fields) {
    +                field.setAccessible(true);
    +                if (PropertyDescriptor.class.isAssignableFrom(field.getType())) {
    +                    descriptors.add((PropertyDescriptor) field.get(null));
    +                }
    +            }
    +        } catch (Exception e) {
    +            throw new IllegalStateException("Failed to build PropertyDescriptor list", e);
    +        }
    +    }
    +
    +    protected volatile Connection amqpConnection;
    +
    +    protected volatile T targetResource;
    +
    +    /**
    +     * Will builds target resource ({@link AMQPPublisher} or
    +     * {@link AMQPConsumer}) upon first invocation and will delegate to the
    +     * implementation of
    +     * {@link #rendezvousWithAmqp(ProcessContext, ProcessSession)} method for
    +     * further processing.
    +     */
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        synchronized (this) {
    +            this.buildTargetResource(context);
    +        }
    +        this.rendezvousWithAmqp(context, session);
    +    }
    +
    +    /**
    +     * Will close current AMQP connection.
    +     */
    +    @OnStopped
    +    public void close() {
    +        try {
    +            this.targetResource.close();
    --- End diff --
    
    If the call to createConnection() were to throw an Exception, both targetResource and amqpConnection could be closed. Should check if these objects are null before calling close()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51599615
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    +            .name("Exchange Name")
    +            .description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
    +                    + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor ROUNTING_KEY = new PropertyDescriptor.Builder()
    +            .name("Routing Key")
    +            .description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
    +                    + "Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property "
    +                    + "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set "
    +                    + "(usually by the AMQP administrator)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    private final static Set<Relationship> relationships;
    +
    +    private final static List<String> amqpPropertyNames = AMQPUtils.getAmqpPropertyNames();
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(EXCHANGE);
    +        _propertyDescriptors.add(ROUNTING_KEY);
    +        _propertyDescriptors.addAll(descriptors);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     * Will construct AMQP message by extracting its body from the incoming
    +     * {@link FlowFile}. AMQP {@link Properties} will be extracted from the
    +     * {@link FlowFile} and converted to {@link BasicProperties} to be sent
    +     * along with the message. Upon success the incoming {@link FlowFile} is
    +     * transfered to 'success' {@link Relationship} and upon failure FlowFile is
    +     * penalized and transfered to the 'failure' {@link Relationship}
    +     * <br>
    +     * NOTE: Attributes extracted from {@link FlowFile} are considered
    +     * candidates for AMQP properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        FlowFile flowFile = processSession.get();
    +        if (flowFile != null) {
    +            BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
    +
    +            byte[] messageContent = this.extractMessage(flowFile, processSession);
    +
    +            try {
    +                this.targetResource.publish(messageContent, amqpProperties);
    +                processSession.transfer(flowFile, REL_SUCCESS);
    +            } catch (Exception e) {
    +                processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:"
    +                                + context.getProperty(ROUNTING_KEY).getValue());
    +                this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
    +                context.yield();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will create an instance of {@link AMQPPublisher}
    +     */
    +    @Override
    +    protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
    +        String exchangeName = context.getProperty(EXCHANGE).getValue();
    +        String routingKey = context.getProperty(ROUNTING_KEY).getValue();
    +        return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger());
    +    }
    +
    +    /**
    +     * Extracts contents of the {@link FlowFile} as byte array.
    +     */
    +    private byte[] extractMessage(FlowFile flowFile, ProcessSession session){
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +        return messageContent;
    +    }
    +
    +    /**
    +     * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
    +     * extracted from {@link FlowFile} are considered candidates for AMQP
    +     * properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     */
    +    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    +        for (Entry<String, String> attributeEntry : attributes.entrySet()) {
    +            if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
    +                String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1];
    +                String amqpPropValue = attributeEntry.getValue();
    +                System.out.println(amqpPropertyNames);
    +                try {
    +                    if (amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
    +                        Method m = builder.getClass().getDeclaredMethod(amqpPropName, String.class);
    +                        m.invoke(builder, amqpPropValue);
    +                    } else {
    +                        getLogger().warn("Unrecogninsed AMQP property '" + amqpPropName + "', will ignore.");
    +                    }
    +                } catch (Exception e) {
    +                    // should really never happen since it should be caught by
    +                    // the above IF.
    +                    getLogger().warn("Failed while tryinhg to build AMQP Properties.", e);
    --- End diff --
    
    Typo in log message - tryinhg instead of trying


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51602345
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    --- End diff --
    
    Since this becomes the attribute name, I would personally lean toward using a "." as that is a common convention in NiFi, to use attribute.name.separated.with.dots - but if there's a reason for using the $ that's fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by DAVID SMITH <da...@btinternet.com>.
Thanks for the prompt reply.  
|    | This email has been sent from a virus-free computer protected by Avast. 
www.avast.com  |

My processor set isn't QPID specific, as you say it is the protocol not the client provider that dictates how the connection is made to AMQP. I used the Apache Qpid JMS client for much the same reasons as you chose RabbitMQ, it was the client I had come across before and also it has been mandated at work that all new AMQP brokers will use the 0.10 protocol or later, and RabbitMQ only works with protocols upto 0.9.1 I discounted it. Also I found that AMQP protocol 0.10 uses a URL and a 'destination' and these two items hold all the information required to make the connection to the broker, whether that be  with username/password or SSL.However, there are some legacy RabbitMQ brokers at work and I was asked to write a set of processors to work with these brokers, initially I tried to do it all in one bundle but it started to get a bit complex so I broke them out into separate bundles.I have looked at the source for your processor set and I have noticed that we have classes with the same names and the classpaths, so I will have to look at modifying mine slightly so that they don't clash. Again I would value any comments when you get chance to look through my code.
Many ThanksDave
David
Great points and in a way confirms what I new all along. AMQP 0.9 and 1.0 might as well be given different names since it is not really an evolution of one from another, but rather two different specs. So, what I think we would need to do is work on the same set of processors for AMQP 1.0 version as a separate bundle. You can me them the same way since really soon the concept of bundle versioning will be coming to NiFi so we won’t be dependent on names as unique identifier.
So if you can work on that please do so and let’s make it happen. 
CheersOleg
Oleg
That sounds really good to me. I will try and get on with the tests asap, if you don't mind I will have a look at how you have done yours.The Apache Qpid Client library also includes an internal 'broker' which I found I can instantiate in my Junit tests and hopefully this may a good way of testing the processors.
Dave 

    On Saturday, 6 February 2016, 23:04, Oleg Zhurakousky <oz...@hortonworks.com> wrote:
 

 I hate spell checker. The last sentence in previous response should read compliant instead of compliment.

Sent from my iPhone

> On Feb 6, 2016, at 17:46, Oleg Zhurakousky <oz...@hortonworks.com> wrote:
> 
> David
> 
> Thank you so much for reaching out.
> The reason why I am using RabbitMQ client library is because I am familiar with it, but as you aware AMQP is a protocol-based specification therefore it doesn’t mater which client library is used as long as they are compliant with the protocol version and current implementation is based on AMQP 0.9.1. 
> Also, you are mentioning QPID client libraries. Do you have an opinion which one is better since as I mentioned I just went with the one I know?. As far as QPID JMS, are you saying that you are using QPID JMS layer to make AMQP look like JMS? If so in my experience layering JMS over AMQP while possible brings a lot of limitations of JMS. In any event would be nice to hear your thoughts on that.
> As far as your processor implementation. Sorry I didn’t have a chance to look at them at the time of writing this response (will look later on), but do you look at them as QPID specific (i.e., QPID vs RabbitMQ)? And if so what is in them that is specific to QPID? The reason why I am asking (and you can see it from discussion on JIRA) is that with this effort we are aiming for processors that are compliment with specific protocol regardless of the broker implementation used, so it must be neutral. 
> 
> Thanks for reaching out once again.
> Cheers
> Oleg
> 
>> On Feb 6, 2016, at 5:19 PM, DAVID SMITH <da...@btinternet.com> wrote:
>> 
>> Hi Guys
>> |    | This email has been sent from a virus-free computer protected by Avast. 
>> www.avast.com  |
>> 
>> 
>> As you may remember I have developed some processors that publish/subscribe to AMQP brokers, but I was having problems writing Junit tests for these processors. I was interested to see that you have been working on NiFi Pull Request 865. I have looked at your code for these processors, we are both using different property descriptors to allow messages to be published and pulled. I also noticed that you are using RabbitMQ libraries to connect to the broker, whereas I connect to the AMQP broker using the QPID JMS libraries. I can still see a use for my processors and I would still be interested getting my processors uploaded to run alongside yours in a future release of NiFi.I have tidied up my code and pushed it back to github:
>> https://github.com/helicopterman22/nifi_amqp_processors.git
>> I would appreciate your feedback Dave
>> 
>> 
>> 
>>  On Wednesday, 3 February 2016, 2:05, asfgit <gi...@git.apache.org> wrote:
>> 
>> 
>> Github user asfgit closed the pull request at:
>> 
>>    https://github.com/apache/nifi/pull/200
>> 
>> 
>> ---
>> If your project is set up for it, you can reply to this email and have your
>> reply appear on GitHub as well. If your project does not have this feature
>> enabled and wishes so, or if the feature is enabled but not working, please
>> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
>> with INFRA.
>> ---
> 

  

Re: [GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
I hate spell checker. The last sentence in previous response should read compliant instead of compliment.

Sent from my iPhone

> On Feb 6, 2016, at 17:46, Oleg Zhurakousky <oz...@hortonworks.com> wrote:
> 
> David
> 
> Thank you so much for reaching out.
> The reason why I am using RabbitMQ client library is because I am familiar with it, but as you aware AMQP is a protocol-based specification therefore it doesn’t mater which client library is used as long as they are compliant with the protocol version and current implementation is based on AMQP 0.9.1. 
> Also, you are mentioning QPID client libraries. Do you have an opinion which one is better since as I mentioned I just went with the one I know?. As far as QPID JMS, are you saying that you are using QPID JMS layer to make AMQP look like JMS? If so in my experience layering JMS over AMQP while possible brings a lot of limitations of JMS. In any event would be nice to hear your thoughts on that.
> As far as your processor implementation. Sorry I didn’t have a chance to look at them at the time of writing this response (will look later on), but do you look at them as QPID specific (i.e., QPID vs RabbitMQ)? And if so what is in them that is specific to QPID? The reason why I am asking (and you can see it from discussion on JIRA) is that with this effort we are aiming for processors that are compliment with specific protocol regardless of the broker implementation used, so it must be neutral. 
> 
> Thanks for reaching out once again.
> Cheers
> Oleg
> 
>> On Feb 6, 2016, at 5:19 PM, DAVID SMITH <da...@btinternet.com> wrote:
>> 
>> Hi Guys
>> |    | This email has been sent from a virus-free computer protected by Avast. 
>> www.avast.com  |
>> 
>> 
>> As you may remember I have developed some processors that publish/subscribe to AMQP brokers, but I was having problems writing Junit tests for these processors. I was interested to see that you have been working on NiFi Pull Request 865. I have looked at your code for these processors, we are both using different property descriptors to allow messages to be published and pulled. I also noticed that you are using RabbitMQ libraries to connect to the broker, whereas I connect to the AMQP broker using the QPID JMS libraries. I can still see a use for my processors and I would still be interested getting my processors uploaded to run alongside yours in a future release of NiFi.I have tidied up my code and pushed it back to github:
>> https://github.com/helicopterman22/nifi_amqp_processors.git
>> I would appreciate your feedback Dave
>> 
>> 
>> 
>>   On Wednesday, 3 February 2016, 2:05, asfgit <gi...@git.apache.org> wrote:
>> 
>> 
>> Github user asfgit closed the pull request at:
>> 
>>    https://github.com/apache/nifi/pull/200
>> 
>> 
>> ---
>> If your project is set up for it, you can reply to this email and have your
>> reply appear on GitHub as well. If your project does not have this feature
>> enabled and wishes so, or if the feature is enabled but not working, please
>> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
>> with INFRA.
>> ---
> 

Re: [GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
David

Thank you so much for reaching out.
The reason why I am using RabbitMQ client library is because I am familiar with it, but as you aware AMQP is a protocol-based specification therefore it doesn’t mater which client library is used as long as they are compliant with the protocol version and current implementation is based on AMQP 0.9.1. 
Also, you are mentioning QPID client libraries. Do you have an opinion which one is better since as I mentioned I just went with the one I know?. As far as QPID JMS, are you saying that you are using QPID JMS layer to make AMQP look like JMS? If so in my experience layering JMS over AMQP while possible brings a lot of limitations of JMS. In any event would be nice to hear your thoughts on that.
As far as your processor implementation. Sorry I didn’t have a chance to look at them at the time of writing this response (will look later on), but do you look at them as QPID specific (i.e., QPID vs RabbitMQ)? And if so what is in them that is specific to QPID? The reason why I am asking (and you can see it from discussion on JIRA) is that with this effort we are aiming for processors that are compliment with specific protocol regardless of the broker implementation used, so it must be neutral. 

Thanks for reaching out once again.
Cheers
Oleg

> On Feb 6, 2016, at 5:19 PM, DAVID SMITH <da...@btinternet.com> wrote:
> 
> Hi Guys
> |    | This email has been sent from a virus-free computer protected by Avast. 
> www.avast.com  |
> 
> 
> As you may remember I have developed some processors that publish/subscribe to AMQP brokers, but I was having problems writing Junit tests for these processors. I was interested to see that you have been working on NiFi Pull Request 865. I have looked at your code for these processors, we are both using different property descriptors to allow messages to be published and pulled. I also noticed that you are using RabbitMQ libraries to connect to the broker, whereas I connect to the AMQP broker using the QPID JMS libraries. I can still see a use for my processors and I would still be interested getting my processors uploaded to run alongside yours in a future release of NiFi.I have tidied up my code and pushed it back to github:
> https://github.com/helicopterman22/nifi_amqp_processors.git
> I would appreciate your feedback Dave
> 
> 
> 
>    On Wednesday, 3 February 2016, 2:05, asfgit <gi...@git.apache.org> wrote:
> 
> 
> Github user asfgit closed the pull request at:
> 
>     https://github.com/apache/nifi/pull/200
> 
> 
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---
> 
> 


Re: [GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by DAVID SMITH <da...@btinternet.com>.
Hi Guys
|    | This email has been sent from a virus-free computer protected by Avast. 
www.avast.com  |


As you may remember I have developed some processors that publish/subscribe to AMQP brokers, but I was having problems writing Junit tests for these processors. I was interested to see that you have been working on NiFi Pull Request 865. I have looked at your code for these processors, we are both using different property descriptors to allow messages to be published and pulled. I also noticed that you are using RabbitMQ libraries to connect to the broker, whereas I connect to the AMQP broker using the QPID JMS libraries. I can still see a use for my processors and I would still be interested getting my processors uploaded to run alongside yours in a future release of NiFi.I have tidied up my code and pushed it back to github:
https://github.com/helicopterman22/nifi_amqp_processors.git
I would appreciate your feedback Dave

 

    On Wednesday, 3 February 2016, 2:05, asfgit <gi...@git.apache.org> wrote:
 

 Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/200


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---


  

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/200


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178705663
  
    Oleg - I have not tested this with any real AMQP data, though I know others have. I have just done a review of the code itself. Several inline comments but aside from those, all looks good and I'm a +1. Thanks for hammering this out - definitely a valuable tool to add to the toolbox!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178641022
  
    Sorry, the S2 naming habit. Will change 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178640104
  
    I noticed none of the tests run when I build from the command line. I think this is due to the tests all ending with "Tests", and we only pick up SomeProcessorTest or TestSomeProcessor (the latter is more common in the code base). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600426
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +
    +/**
    + * Base class for implementing publishing and consuming AMQP workers.
    + *
    + * @see AMQPPublisher
    + * @see AMQPConsumer
    + */
    +abstract class AMQPWorker implements AutoCloseable {
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
    +
    +    protected final Channel channel;
    +
    +    /**
    +     * Creates an instance of this worker initializing it with AMQP
    +     * {@link Connection} and creating a target {@link Channel} used by
    +     * sub-classes to interruct with AMQP-based messaging system.
    --- End diff --
    
    typo - 'interruct'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178920339
  
    @olegz I think this is good to go from a review perspective. There appears to be some conflicts in pom files now because another bundle just got added. Can you rebase to master and squash commits and then we'll get this merged in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600206
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    +            .name("Exchange Name")
    +            .description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
    +                    + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor ROUNTING_KEY = new PropertyDescriptor.Builder()
    +            .name("Routing Key")
    +            .description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
    +                    + "Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property "
    +                    + "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set "
    +                    + "(usually by the AMQP administrator)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    private final static Set<Relationship> relationships;
    +
    +    private final static List<String> amqpPropertyNames = AMQPUtils.getAmqpPropertyNames();
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(EXCHANGE);
    +        _propertyDescriptors.add(ROUNTING_KEY);
    +        _propertyDescriptors.addAll(descriptors);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     * Will construct AMQP message by extracting its body from the incoming
    +     * {@link FlowFile}. AMQP {@link Properties} will be extracted from the
    +     * {@link FlowFile} and converted to {@link BasicProperties} to be sent
    +     * along with the message. Upon success the incoming {@link FlowFile} is
    +     * transfered to 'success' {@link Relationship} and upon failure FlowFile is
    +     * penalized and transfered to the 'failure' {@link Relationship}
    +     * <br>
    +     * NOTE: Attributes extracted from {@link FlowFile} are considered
    +     * candidates for AMQP properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        FlowFile flowFile = processSession.get();
    +        if (flowFile != null) {
    +            BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
    +
    +            byte[] messageContent = this.extractMessage(flowFile, processSession);
    +
    +            try {
    +                this.targetResource.publish(messageContent, amqpProperties);
    +                processSession.transfer(flowFile, REL_SUCCESS);
    +            } catch (Exception e) {
    +                processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:"
    +                                + context.getProperty(ROUNTING_KEY).getValue());
    +                this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
    +                context.yield();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will create an instance of {@link AMQPPublisher}
    +     */
    +    @Override
    +    protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
    +        String exchangeName = context.getProperty(EXCHANGE).getValue();
    +        String routingKey = context.getProperty(ROUNTING_KEY).getValue();
    +        return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger());
    +    }
    +
    +    /**
    +     * Extracts contents of the {@link FlowFile} as byte array.
    +     */
    +    private byte[] extractMessage(FlowFile flowFile, ProcessSession session){
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +        return messageContent;
    +    }
    +
    +    /**
    +     * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
    +     * extracted from {@link FlowFile} are considered candidates for AMQP
    +     * properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     */
    +    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    +        for (Entry<String, String> attributeEntry : attributes.entrySet()) {
    +            if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
    +                String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1];
    +                String amqpPropValue = attributeEntry.getValue();
    +                System.out.println(amqpPropertyNames);
    --- End diff --
    
    oops


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51595280
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    +
    +    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
    +
    +    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
    +            "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
    +            "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
    +            "amqp$clusterId");
    +    /**
    +     * Returns a {@link List} of AMQP property names defined in
    +     * {@link BasicProperties}
    +     */
    +    public static List<String> getAmqpPropertyNames() {
    +        return propertyNames;
    +    }
    +
    +    /**
    +     * Updates {@link FlowFile} with attributes representing AMQP properties
    +     *
    +     * @param amqpProperties
    +     *            instance of {@link BasicProperties}
    +     * @param flowFile
    +     *            instance of target {@link FlowFile}
    +     * @param processSession
    +     *            instance of {@link ProcessSession}
    +     */
    +    public static FlowFile updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, FlowFile flowFile, ProcessSession processSession) {
    --- End diff --
    
    This all feels very complicated to me. There are only 14 different things we care about here. Can't we just call the 14 getters, rather than writing a 115 line class that uses reflection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51593685
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +import com.rabbitmq.client.GetResponse;
    +
    +/**
    + * Consuming AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
    + * {@link FlowFile} containing the body of the consumed AMQP message and AMQP
    + * properties that came with message which are added to a {@link FlowFile} as
    + * attributes.
    + */
    +@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("Queue")
    +            .description("Source queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    private final static Set<Relationship> relationships;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(QUEUE);
    +        _propertyDescriptors.addAll(descriptors);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     * Will construct a {@link FlowFile} containing the body of the consumed
    +     * AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
    +     * not null) and AMQP properties that came with message which are added to a
    +     * {@link FlowFile} as attributes, transferring {@link FlowFile} to
    +     * 'success' {@link Relationship}.
    +     */
    +    @Override
    +    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        final GetResponse response = this.targetResource.consume();
    +        if (response != null){
    +            FlowFile flowFile = processSession.create();
    +            flowFile = processSession.append(flowFile, new OutputStreamCallback() {
    --- End diff --
    
    I would change this to processSession.write - because the FlowFile has no content, the result will be the same (i.e., they have the same semantics) but the internal workings will actually be a little more efficient (and the code would be more consistent with what other processors do) if this is session.write


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51602840
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    +
    +    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
    +
    +    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
    --- End diff --
    
    On the flip side, if there is no amqp$contentType attribute but there is a mime.type attribute, should consider having PublishAMQP add the value of the mime.type attribute?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178347191
  
    Will fix as well as the merge conflict


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51593879
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java ---
    @@ -0,0 +1,204 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ConnectionFactory;
    +
    +/**
    + * Base processor that uses RabbitMQ client API
    + * (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
    + * messaging systems version 0.9.1
    + *
    + * @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
    + *            and {@link AMQPConsumer}
    + */
    +abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
    +            .name("Host Name")
    +            .description("Network address of AMQP broker (e.g., localhost)")
    +            .required(true)
    +            .defaultValue("localhost")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
    +            .name("Port")
    +            .description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
    +            .required(true)
    +            .defaultValue("5672")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
    +            .name("Virtual Host")
    +            .description("Virtual Host name which segregates AMQP system for enhanced security.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
    +            .name("User Name")
    +            .description("User Name used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .sensitive(true)
    +            .build();
    +    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder()
    +            .name("AMQP Version")
    +            .description("AMQP Version. Currently only supports AMQP v0.9.1.")
    +            .required(true)
    +            .allowableValues("0.9.1")
    +            .defaultValue("0.9.1")
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times
    +     */
    +    static {
    --- End diff --
    
    Well, you are the first who got confused ;). In any event, my motivation for now is that I want to include them all, so I will edit a comment above to ensure that it's clear and that any additional property will be added automatically. In the event this thing will get extended in the future we can always go back to your suggestion, but for now its less code to maintain ;) and that is the motivation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51604497
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +
    +/**
    + * Base class for implementing publishing and consuming AMQP workers.
    + *
    + * @see AMQPPublisher
    + * @see AMQPConsumer
    + */
    +abstract class AMQPWorker implements AutoCloseable {
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
    +
    +    protected final Channel channel;
    +
    +    /**
    +     * Creates an instance of this worker initializing it with AMQP
    +     * {@link Connection} and creating a target {@link Channel} used by
    +     * sub-classes to interruct with AMQP-based messaging system.
    +     *
    +     * @param connection
    +     *            instance of {@link Connection}
    +     */
    +    public AMQPWorker(Connection connection) {
    +        this.validateConnection(connection);
    +        try {
    +            this.channel = connection.createChannel();
    +        } catch (IOException e) {
    +            logger.error("Failed to create Channel for " + connection, e);
    +            throw new IllegalStateException(e);
    +        }
    +    }
    +
    +    /**
    +     * Closes {@link Channel} created when instance of this class was created.
    +     */
    +    @Override
    +    public void close() throws Exception {
    --- End diff --
    
    It does - but I personally always prefer to narrow the Exceptions that I am throwing when I override a method, if I can. This way, if someone is using the concrete implementation, they know exactly what Exceptions may be thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51599382
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    +            .name("Exchange Name")
    +            .description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
    +                    + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor ROUNTING_KEY = new PropertyDescriptor.Builder()
    +            .name("Routing Key")
    +            .description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
    +                    + "Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property "
    +                    + "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set "
    +                    + "(usually by the AMQP administrator)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    private final static Set<Relationship> relationships;
    +
    +    private final static List<String> amqpPropertyNames = AMQPUtils.getAmqpPropertyNames();
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(EXCHANGE);
    +        _propertyDescriptors.add(ROUNTING_KEY);
    +        _propertyDescriptors.addAll(descriptors);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     * Will construct AMQP message by extracting its body from the incoming
    +     * {@link FlowFile}. AMQP {@link Properties} will be extracted from the
    +     * {@link FlowFile} and converted to {@link BasicProperties} to be sent
    +     * along with the message. Upon success the incoming {@link FlowFile} is
    +     * transfered to 'success' {@link Relationship} and upon failure FlowFile is
    +     * penalized and transfered to the 'failure' {@link Relationship}
    +     * <br>
    +     * NOTE: Attributes extracted from {@link FlowFile} are considered
    +     * candidates for AMQP properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        FlowFile flowFile = processSession.get();
    +        if (flowFile != null) {
    +            BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
    +
    +            byte[] messageContent = this.extractMessage(flowFile, processSession);
    +
    +            try {
    +                this.targetResource.publish(messageContent, amqpProperties);
    +                processSession.transfer(flowFile, REL_SUCCESS);
    +            } catch (Exception e) {
    +                processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:"
    +                                + context.getProperty(ROUNTING_KEY).getValue());
    +                this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
    +                context.yield();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will create an instance of {@link AMQPPublisher}
    +     */
    +    @Override
    +    protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
    +        String exchangeName = context.getProperty(EXCHANGE).getValue();
    +        String routingKey = context.getProperty(ROUNTING_KEY).getValue();
    +        return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger());
    +    }
    +
    +    /**
    +     * Extracts contents of the {@link FlowFile} as byte array.
    +     */
    +    private byte[] extractMessage(FlowFile flowFile, ProcessSession session){
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +        return messageContent;
    +    }
    +
    +    /**
    +     * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
    +     * extracted from {@link FlowFile} are considered candidates for AMQP
    +     * properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     */
    +    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    +        for (Entry<String, String> attributeEntry : attributes.entrySet()) {
    +            if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
    +                String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1];
    +                String amqpPropValue = attributeEntry.getValue();
    +                System.out.println(amqpPropertyNames);
    --- End diff --
    
    I think you forgot to remove this? Or change to debug level log message perhaps?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51593209
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java ---
    @@ -0,0 +1,204 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ConnectionFactory;
    +
    +/**
    + * Base processor that uses RabbitMQ client API
    + * (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
    + * messaging systems version 0.9.1
    + *
    + * @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
    + *            and {@link AMQPConsumer}
    + */
    +abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
    +            .name("Host Name")
    +            .description("Network address of AMQP broker (e.g., localhost)")
    +            .required(true)
    +            .defaultValue("localhost")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
    +            .name("Port")
    +            .description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
    +            .required(true)
    +            .defaultValue("5672")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
    +            .name("Virtual Host")
    +            .description("Virtual Host name which segregates AMQP system for enhanced security.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
    +            .name("User Name")
    +            .description("User Name used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .sensitive(true)
    +            .build();
    +    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder()
    +            .name("AMQP Version")
    +            .description("AMQP Version. Currently only supports AMQP v0.9.1.")
    +            .required(true)
    +            .allowableValues("0.9.1")
    +            .defaultValue("0.9.1")
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times
    +     */
    +    static {
    --- End diff --
    
    It took me a while to figure out what was going on in this block. I find this far more confusing than just explicitly stating the PropertyDescriptors to include.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51598495
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    --- End diff --
    
    This PropertyDescriptor confuses me. It indicates in the description that it is optional but is then marked required, and allows an empty string. Does it make more sense to mark it as not required and then use a NON_EMPTY_VALIDATOR with no default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51594383
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java ---
    @@ -0,0 +1,204 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ConnectionFactory;
    +
    +/**
    + * Base processor that uses RabbitMQ client API
    + * (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
    + * messaging systems version 0.9.1
    + *
    + * @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
    + *            and {@link AMQPConsumer}
    + */
    +abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
    +            .name("Host Name")
    +            .description("Network address of AMQP broker (e.g., localhost)")
    +            .required(true)
    +            .defaultValue("localhost")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
    +            .name("Port")
    +            .description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
    +            .required(true)
    +            .defaultValue("5672")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
    +            .name("Virtual Host")
    +            .description("Virtual Host name which segregates AMQP system for enhanced security.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
    +            .name("User Name")
    +            .description("User Name used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password used for authentication and authorization.")
    +            .required(true)
    +            .defaultValue("guest")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .sensitive(true)
    +            .build();
    +    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder()
    +            .name("AMQP Version")
    +            .description("AMQP Version. Currently only supports AMQP v0.9.1.")
    +            .required(true)
    +            .allowableValues("0.9.1")
    +            .defaultValue("0.9.1")
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times
    +     */
    +    static {
    --- End diff --
    
    I do agree that it is less code - however, the code is certainly more complex. I will favor code that is more verbose but simpler over code that is concise but complex any day.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51604778
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    +
    +    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
    +
    +    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
    --- End diff --
    
    Consider yes, but let's not do it for this release. The user can still have UpdateAttributes upstream and map mime.type to contentType. Just want to make sure we avoid potential for collisions until we start getting some usage out of it



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600893
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    --- End diff --
    
    Either is ok with me as long as the description lines up with how the PropertyDescriptor is configured. Just a little odd to claim that it's optional and then make it required :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51596690
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    +
    +    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
    +
    +    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
    +            "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
    +            "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
    +            "amqp$clusterId");
    +    /**
    +     * Returns a {@link List} of AMQP property names defined in
    +     * {@link BasicProperties}
    +     */
    +    public static List<String> getAmqpPropertyNames() {
    +        return propertyNames;
    +    }
    +
    +    /**
    +     * Updates {@link FlowFile} with attributes representing AMQP properties
    +     *
    +     * @param amqpProperties
    +     *            instance of {@link BasicProperties}
    +     * @param flowFile
    +     *            instance of target {@link FlowFile}
    +     * @param processSession
    +     *            instance of {@link ProcessSession}
    +     */
    +    public static FlowFile updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, FlowFile flowFile, ProcessSession processSession) {
    --- End diff --
    
    It may very well, but its a utility class that hides the complexity so user only sees simplicity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51605138
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +
    +/**
    + * Base class for implementing publishing and consuming AMQP workers.
    + *
    + * @see AMQPPublisher
    + * @see AMQPConsumer
    + */
    +abstract class AMQPWorker implements AutoCloseable {
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
    +
    +    protected final Channel channel;
    +
    +    /**
    +     * Creates an instance of this worker initializing it with AMQP
    +     * {@link Connection} and creating a target {@link Channel} used by
    +     * sub-classes to interruct with AMQP-based messaging system.
    +     *
    +     * @param connection
    +     *            instance of {@link Connection}
    +     */
    +    public AMQPWorker(Connection connection) {
    +        this.validateConnection(connection);
    +        try {
    +            this.channel = connection.createChannel();
    +        } catch (IOException e) {
    +            logger.error("Failed to create Channel for " + connection, e);
    +            throw new IllegalStateException(e);
    +        }
    +    }
    +
    +    /**
    +     * Closes {@link Channel} created when instance of this class was created.
    +     */
    +    @Override
    +    public void close() throws Exception {
    --- End diff --
    
    Actually Channel will throws more then IO, anyway, I'l change
    ```
    public void close() throws TimeoutException, IOException {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/200#issuecomment-178928684
  
    @bbende done!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51602109
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +
    +/**
    + * Base class for implementing publishing and consuming AMQP workers.
    + *
    + * @see AMQPPublisher
    + * @see AMQPConsumer
    + */
    +abstract class AMQPWorker implements AutoCloseable {
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
    +
    +    protected final Channel channel;
    +
    +    /**
    +     * Creates an instance of this worker initializing it with AMQP
    +     * {@link Connection} and creating a target {@link Channel} used by
    +     * sub-classes to interruct with AMQP-based messaging system.
    +     *
    +     * @param connection
    +     *            instance of {@link Connection}
    +     */
    +    public AMQPWorker(Connection connection) {
    +        this.validateConnection(connection);
    +        try {
    +            this.channel = connection.createChannel();
    +        } catch (IOException e) {
    +            logger.error("Failed to create Channel for " + connection, e);
    +            throw new IllegalStateException(e);
    +        }
    +    }
    +
    +    /**
    +     * Closes {@link Channel} created when instance of this class was created.
    +     */
    +    @Override
    +    public void close() throws Exception {
    --- End diff --
    
    That comes from AutoCloseable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600065
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
    + * AMQP message sending it to an exchange identified during construction of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
    --- End diff --
    
    The motivation was that for 90% of the cases (in my experience) it will be required. No one sends messages to a default exchange in prod, so this approach IMHO kind of speaks to that. 
    Having said that I am ok with either, so let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51602176
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    +
    +    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
    +
    +    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
    +            "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
    +            "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
    +            "amqp$clusterId");
    +    /**
    +     * Returns a {@link List} of AMQP property names defined in
    +     * {@link BasicProperties}
    +     */
    +    public static List<String> getAmqpPropertyNames() {
    +        return propertyNames;
    +    }
    +
    +    /**
    +     * Updates {@link FlowFile} with attributes representing AMQP properties
    +     *
    +     * @param amqpProperties
    +     *            instance of {@link BasicProperties}
    +     * @param flowFile
    +     *            instance of target {@link FlowFile}
    +     * @param processSession
    +     *            instance of {@link ProcessSession}
    +     */
    +    public static FlowFile updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, FlowFile flowFile, ProcessSession processSession) {
    --- End diff --
    
    I definitely think it's a lot cleaner to just call the getter's explicitly. However, if i you feel strongly that this is the way to go here, I won't hold you back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51601774
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ReturnListener;
    +
    +/**
    + * Generic publisher of messages to AMQP-based messaging system. It is based on
    + * RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
    + */
    +final class AMQPPublisher extends AMQPWorker {
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPPublisher.class);
    +
    +    private final String exchangeName;
    +
    +    private final String routingKey;
    +
    +    private final ProcessorLog processLog;
    +
    +    /**
    +     * Creates an instance of this publisher
    +     *
    +     * @param connection
    +     *            instance of AMQP {@link Connection}
    +     * @param exchangeName
    +     *            the name of AMQP exchange to which messages will be published.
    +     *            If not provided 'default' exchange will be used.
    +     * @param routingKey
    +     *            (required) the name of the routingKey to be used by AMQP-based
    +     *            system to route messages to its final destination (queue).
    +     */
    +    AMQPPublisher(Connection connection, String exchangeName, String routingKey, ProcessorLog processLog) {
    +        super(connection);
    +        this.processLog = processLog;
    +        this.validateStringProperty("routingKey", routingKey);
    +        this.exchangeName = exchangeName == null ? "" : exchangeName.trim();
    +        if (this.exchangeName.length() == 0) {
    +            logger.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
    +        }
    +
    +        this.routingKey = routingKey;
    +        this.channel.addReturnListener(new UndeliverableMessageLogger());
    +        logger.info("Successfully connected AMQPPublisher to " + connection.toString() + " and '" + this.exchangeName
    +                + "' exchange with '" + routingKey + "' as a routing key.");
    +    }
    +
    +    /**
    +     * Publishes message without any AMQP properties (see
    +     * {@link BasicProperties}) to a pre-defined AMQP Exchange.
    +     *
    +     * @param bytes
    +     *            bytes representing a message.
    +     */
    +    void publish(byte[] bytes) {
    +        this.publish(bytes, null);
    +    }
    +
    +    /**
    +     * Publishes message with provided AMQP properties (see
    +     * {@link BasicProperties}) to a pre-defined AMQP Exchange.
    +     *
    +     * @param bytes
    +     *            bytes representing a message.
    +     * @param properties
    +     *            instance of {@link BasicProperties}
    +     */
    +    void publish(byte[] bytes, BasicProperties properties) {
    +        if (this.channel.isOpen()) {
    +            try {
    +                this.channel.basicPublish(this.exchangeName, this.routingKey, true, properties, bytes);
    +            } catch (Exception e) {
    +                throw new IllegalStateException("Failed to publish to '" +
    +                        this.exchangeName + "' with '" + this.routingKey + "'.", e);
    +            }
    +        } else {
    +            throw new IllegalStateException("This instance of AMQPPublisher is invalid since "
    +                    + "its publishigChannel is closed");
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public String toString() {
    +        return super.toString() + ", EXCHANGE:" + this.exchangeName + ", ROUTING_KEY:" + this.routingKey;
    +    }
    +
    +    /**
    +     * Listener to listen and WARN-log undeliverable messages which are returned
    --- End diff --
    
    If I am understanding this comment correctly, it is possible, if the AMQP Broker is not setup properly, to lose messages? Is that accurate? If so, I think we need to be sure to explicitly state this in the PublishAMQP's @CapabilityDescription


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51603654
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    --- End diff --
    
    I actually was talking about it with @bbende . I actually did it on purpose to ensure that it doesn't collide with something that may not actually be AMQP property (e.g., 'amqp.message.sender=Mark Payne'). So it a way I've chosen a special character to limit the possibility of outside interference. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-865 added initial support for AMQP publish...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51602747
  
    --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Utility helper class simplify interructions with target AMQP API and NIFI
    + * API.
    + *
    + */
    +abstract class AMQPUtils {
    +
    +    public final static String AMQP_PROP_DELIMITER = "$";
    +
    +    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
    +
    +    private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
    +
    +    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
    --- End diff --
    
    When ConsumeAMQP creates a FlowFile with an attribute named 'amqp$contentType' - we should consider also writing that value to the 'mime.type' attribute, as this is the 'standard attribute' that we use to determine the content type. This is used, for instance, by some of the Content Viewers. So if it's text/json, for example, we know how to render that if viewing content via Provenance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---