You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by KayLerch <gi...@git.apache.org> on 2016/04/13 22:51:12 UTC

[GitHub] nifi pull request: Nifi AWS IoT processors

GitHub user KayLerch opened a pull request:

    https://github.com/apache/nifi/pull/349

    Nifi AWS IoT processors

    [20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf](https://github.com/apache/nifi/files/217918/20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf)
    
    Four new processors to communicate with Amazon’s managed device gateway service AWS IoT. 
    
    **Use cases**
    * Consume reported states from a fleet of things managed and secured on Amazon’s gateway service
    * Propagate desired states to a fleet of things managed and secured on Amazon’s gateway service
    * Intercept M2M communication
    * Hybrid IoT solutions: brings together a managed device gateway in the cloud and onpremise data-consumers and -providers.  
    
    **GetIOTMqtt**:
    Opens up a connection to an AWS-account-specific websocket endpoint in order to subscribe to any of the MQTT topics belonging to a registered thing in AWS IoT.
    
    **PutIOTMqtt**
    Opens up a connection to an AWS-account-specific websocket endpoint in order to publish messages to any of the MQTT topics belonging to a registered thing in AWS IoT.
    
    **GetIOTShadow**
    In AWS IoT a physical thing is represented with its last reported state by the so-called thing shadow. This processor reads out the current state of a shadow (persisted as JSON) by requesting the managed API of AWS IoT.
    
    **PutIOTShadow**
    In AWS IoT a physical thing is represented with its last reported state by the so-called thing shadow. This processor updates the current state of a shadow (persisted as JSON) by requesting the managed API of AWS IoT. An update to a shadow lets AWS IoT propagate changes to the MQTT topics of the thing.
    
    Known issues: 
    * It was hard for me to write appropriate integration tests since the MQTT processors work with durable websocket-connections which are kind of tough to test. With your help I would love to do a better job on testing and hand it in later on. All of the processors were tested in a live-scenario which ran over a longer period of time. Didn’t observe any issue.
    * I got rid of all the properties for the deprecated AWSCredentialProviderService and only make use of AWSCredentialsProviderService. If both are still necessary for backward-compatibilit sake I would add the deprecated feature. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/KayLerch/nifi nifi-aws-iot-processor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/349.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #349
    
----
commit f9d7df8d78d5625f96b27ade86f43f2a66a966dc
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T10:48:07Z

    Added IOT-Processors for reading and writing AWS IOT shadow and MQTT topics

commit 51ad5197383df745d65efe82f793da4c98c63b01
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T13:03:42Z

    Let abstract shadow processor use the correct AWSclient. Minor fixes due to configuration issues

commit 67460e11f3082a708073d4cccaa2d740adcf72ff
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T13:43:27Z

    Let abstract shadow processor use the correct AWSclient. Minor fixes due to configuration issues

commit 53c5855efabb0d6d2963e84e34be244e9ed47e54
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T14:38:47Z

    Minor fixes due to configuration issues

commit 380fe2ca0da9e58da2eec0ed2ac755479db00c2d
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T15:56:21Z

    Minor fixes due to configuration issues

commit e36c73cae456ef4ba572e4d44f6aa5e3e1157e20
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T16:23:34Z

    Adjusted properties for iot processors

commit 2c8eeb6f31f5fe37dcf4882296df77407e5e9da6
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T18:15:59Z

    Add UnitTest for GetIOTMqtt processor

commit fd6e41db3a9d19aa3a6d869e8782dc45589fdf66
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-10T20:30:51Z

    Added more UnitTests

commit c0c085c8d608c88f427c2a81aa26d6db8104a73a
Author: Lerch, Kay <ka...@immobilienscout24.de>
Date:   2016-04-12T15:30:33Z

    More detailed documentation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    For the new dependencies, NOTICE/LICENSE information needs to be added if applicable. Paho is EPL 1.0 licensed and requires being added to the nar NOTICE like so[1]. Do the other two new deps have license or notice information to add?
    
    [1] https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE#L29


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-213803164
  
    @KayLerch not sure why the Travis build did not trigger for your PR.  I was scoping some of this out locally and see that some contrib issues came up (this can be evaluated locally with a `mvn clean install -Pcontrib-check`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-214778406
  
    Errr..i messed that response up.  I meant I agree with the point @JPercivall was making and understand what @KayLerch was thinking.  I think the names should be
    
    GetAWSIoTMqtt
    PutAWSIoTMqtt
    GetAWSIoTShadow
    PutAWSIoTShadow
    
    The names have a lot going on but these are pretty purpose built for the AWS IoT platform.
    
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65421171
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoT.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.aws.iot.util.IoTMessage;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.LinkedList;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Get", "Subscribe", "Receive"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Subscribes to and receives messages from MQTT-topic(s) of AWS IoT." +
    +    "The processor keeps open a WebSocket connection and will automatically renew the " +
    +    "connection to overcome Amazon's service limit on maximum connection duration. Depending on " +
    +    "your set up QoS the processor will miss some messages (QoS=0) or receives messages twice (QoS=1) " +
    +    "while reconnecting to AWS IoT WebSocket endpoint. We strongly recommend you to make use of " +
    +    "processor isolation as concurrent subscriptions to an MQTT topic result in multiple message receiptions.")
    +@SeeAlso({ GetAWSIoTShadow.class })
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.endpoint", description = "AWS endpoint this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.topic", description = "MQTT topic this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.client", description = "MQTT client which received the message."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.qos", description = "Underlying MQTT quality-of-service.")
    +})
    +public class GetAWSIoT extends AbstractAWSIoTProcessor {
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        // init to build up mqtt connection over web-sockets
    +        init(context);
    +        if (mqttClient != null && mqttClient.isConnected()) {
    +            try {
    +                // subscribe to topic with configured qos in order to start receiving messages
    +                mqttClient.subscribe(awsTopic, awsQos);
    +            } catch (MqttException e) {
    +                getLogger().error("Error while subscribing to topic " + awsTopic + " with client-id " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final List messageList = new LinkedList();
    +        // check if connection is about to terminate
    +        if (isConnectionAboutToExpire()) {
    +            MqttWebSocketAsyncClient _mqttClient = null;
    +            try {
    +                // before subscribing to the topic with new connection first unsubscribe
    +                // old connection from same topic if subscription is set to QoS 0
    +                if (awsQos == 0) mqttClient.unsubscribe(awsTopic);
    +                // establish a second connection
    +                _mqttClient = connect(context);
    +                // now subscribe to topic with new connection
    +                _mqttClient.subscribe(awsTopic, awsQos);
    +                // between re-subscription and disconnect from old connection
    +                // QoS=0 subscription eventually lose some messages
    +                // QoS=1 subscription eventually receive some messages twice
    +                // now terminate old connection
    +                mqttClient.disconnect();
    +            } catch (MqttException e) {
    +                getLogger().error("Error while renewing connection with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            } finally {
    +                // grab messages left over from old connection
    +                mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    +                // now set the new connection as the default connection
    +                if (_mqttClient != null) mqttClient = _mqttClient;
    +            }
    +        } else {
    +            // grab messages which queued up since last run
    +            mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    +        }
    +
    +        if (messageList.isEmpty()) return;
    +
    +        for (Object aMessageList : messageList) {
    +            FlowFile flowFile = session.create();
    +            final IoTMessage msg = (IoTMessage) aMessageList;
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put(PROP_NAME_ENDPOINT, awsEndpoint);
    +            attributes.put(PROP_NAME_TOPIC, msg.getTopic());
    +            attributes.put(PROP_NAME_CLIENT, awsClientId);
    +            attributes.put(PROP_NAME_QOS, msg.getQos().toString());
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +
    +            flowFile = session.write(flowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws IOException {
    +                    out.write(msg.getPayload());
    +                }
    +            });
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.commit();
    --- End diff --
    
    A "RECEIVE" provenance event should emitted here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65421049
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    --- End diff --
    
    It's just a convention I prefer as a user. It consistently states the specific context a property needs to be set in. It is also consistent with the names of the flowfile properties which optionally override these property values.
    Anyway, if this is a matter I'll give the properties a more meaningful title. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65405495
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot.util;
    +
    +import java.net.URI;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.TimerPingSender;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttSecurityException;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +public class MqttWebSocketAsyncClient extends MqttAsyncClient implements MqttCallback {
    +
    +    protected volatile LinkedBlockingQueue<IoTMessage> awsQueuedMqttMessages = new LinkedBlockingQueue<IoTMessage>();
    +    protected final ProcessorLog logger;
    +    protected final String serverURI;
    +
    +    protected static String createDummyURI(String original) {
    +        if (!original.startsWith("ws:") && !original.startsWith("wss:")) {
    +            return original;
    +        }
    +        final URI uri = URI.create(original);
    +        return "tcp://DUMMY-" + uri.getHost() + ":"
    +                + (uri.getPort() > 0 ? uri.getPort() : 80);
    +    }
    +
    +    protected static boolean isDummyURI(String uri) {
    +        return uri.startsWith("tcp://DUMMY-");
    +    }
    +
    +    public MqttWebSocketAsyncClient(String serverURI, String clientId,
    +                                    ProcessorLog logger) throws MqttException {
    +        super(createDummyURI(serverURI), clientId, new MemoryPersistence(), new TimerPingSender());
    +        this.serverURI = serverURI;
    +        this.logger = logger;
    +        this.setCallback(this);
    +    }
    +
    +    @Override
    +    protected NetworkModule[] createNetworkModules(String address,
    +                                                   MqttConnectOptions options) throws MqttException{
    +        String[] serverURIs = options.getServerURIs();
    +        String[] array = serverURIs == null ? new String[] { address } :
    +            serverURIs.length == 0 ? new String[] { address }: serverURIs;
    +
    +        NetworkModule[] networkModules = new NetworkModule[array.length];
    +        for (int i = 0; i < array.length; i++) {
    +            networkModules[i] = createNetworkModule(array[i], options);
    +        }
    +        return networkModules;
    +    }
    +
    +    protected NetworkModule createNetworkModule(String input,
    +                                                MqttConnectOptions options) throws MqttException,
    +            MqttSecurityException {
    +        final String address = isDummyURI(input) ? this.serverURI : input;
    +        if (!address.startsWith("ws:") && !address.startsWith("wss:")) {
    +            return super.createNetworkModules(address, options)[0];
    +        }
    +
    +        final String subProtocol = (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_3_1) ? "mqttv3.1" : "mqtt";
    +        return newWebSocketNetworkModule(URI.create(address), subProtocol, options);
    +    }
    +
    +    protected NetworkModule newWebSocketNetworkModule(URI uri,
    +                                                      String subProtocol, MqttConnectOptions options) {
    +        final WebSocketNetworkModule netModule = new WebSocketNetworkModule(
    +                uri, subProtocol, getClientId());
    +        netModule.setConnectTimeout(options.getConnectionTimeout());
    +        return netModule;
    +    }
    +
    +    public LinkedBlockingQueue<IoTMessage> getAwsQueuedMqttMessages() {
    +        return awsQueuedMqttMessages;
    +    }
    +
    +    @Override
    +    public void connectionLost(Throwable t) {
    +        logger.error("Connection to " + this.getServerURI() + " lost with cause: " + t.getMessage());
    +    }
    +
    +    @Override
    +    public void deliveryComplete(IMqttDeliveryToken token) {
    +    }
    +
    +    @Override
    +    public void messageArrived(String topic, MqttMessage message) throws Exception {
    +        logger.info("Message arrived from topic: " + topic);
    --- End diff --
    
    This came up in my review as well (I did this at first), we shouldn't set the messageArrived message to INFO. The rate at which MQTT message potentially arrive would fill up the logs very quickly. Check out how I handle it in ConsumeMQTT[1].
    
    [1] https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L310


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the pull request:

    https://github.com/apache/nifi/pull/349
  
    Is there anything I can help with getting this through?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    @joewitt - I am happy to adopt the stalled PR and address @JPercivall 's comments.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-214763418
  
    @JPercivall Agree on that but made the naming decision based on an established convention to all the other AWS processors in this project. None of them indicates "AWS" in their names. This wasn't a problem until now as SQS, SNS, S3 and so on are really unique names. Which isn't true for IoT and MQTT. Wouldn't make the decision to break a naming convention on my own but would rather encourage the NiFi-team to think it over.
    
    @apiri As soon as I have the time I'll take a look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65421951
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open after automatically renewing it. " +
    +                    "This is neccessary due to Amazon's service limit on WebSocket connection duration. " +
    +                    "As soon as the limit is changed by Amazon you can adjust the value here. Never use " +
    +                    "a duration longer than supported by Amazon. This processor renews the connection " +
    +                    "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " +
    +                    "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_TOPIC)
    +            .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_QOS)
    +            .description("Decide for at most once (0) or at least once (1) message-receiption. " +
    +                    "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " +
    +                    "is " + DEFAULT_QOS + ".")
    +            .required(false)
    +            .allowableValues("0", "1")
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * Create client using credentials provider. This is the preferred way for creating clients
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials provider ");
    +        // Actually this client is not needed. However, it is initialized due to the pattern of
    +        // AbstractAWSCredentialsProviderProcessor
    +        return new AWSIotClient(credentialsProvider, config);
    +    }
    +
    +    /**
    +     * Create client using AWSCredentails
    +     *
    +     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials ");
    +        // Actually this client is not needed. it is initialized due to the pattern of
    +        // AbstractAWSProcessor
    +        return new AWSIotClient(credentials, config);
    +    }
    +
    +    /**
    +     * Gets ready an MQTT client by connecting to a AWS IoT WebSocket endpoint specific to the properties
    +     * @param context processor context
    +     */
    +    void init(final ProcessContext context) {
    +        // read out properties
    +        awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue();
    +        awsRegion = context.getProperty(REGION).getValue();
    +        awsClientId = context.getProperty(PROP_CLIENT).isSet() ? context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT;
    +        awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() ? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE;
    +        awsTopic = context.getProperty(PROP_TOPIC).getValue();
    +        awsQos = context.getProperty(PROP_QOS).isSet() ? context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS;
    +        // initialize and connect to mqtt endpoint
    +        mqttClient = connect(context);
    +    }
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) {
    +        try {
    +            mqttClient.disconnect();
    +        } catch (MqttException me) {
    +            getLogger().warn("MQTT " + me.getMessage());
    +        }
    +        getLogger().info("Disconnected");
    +    }
    +
    +    /**
    +     * Returns the lifetime-seconds of the established websocket-connection
    +     * @return seconds
    +     */
    +    long getConnectionDuration() {
    +        return dtLastConnect != null
    +                ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - dtLastConnect.getTime())
    +                : awsKeepAliveSeconds + 1;
    +    }
    +
    +    /**
    +     * In seconds get the remaining lifetime of the connection. It is not the actual time to
    +     * expiration but an advice to when it is worth renewing the connection.
    +     * @return seconds
    +     */
    +    long getRemainingConnectionLifetime() {
    +        return awsKeepAliveSeconds - DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION;
    +    }
    +
    +    /**
    +     * Indicates if WebSocket connection is about to expire. It gives the caller an advice
    +     * to renew the connection some time before the actual expiration.
    +     * @return Indication (if true caller should renew the connection)
    +     */
    +    boolean isConnectionAboutToExpire() {
    +        return getConnectionDuration() > getRemainingConnectionLifetime();
    +    }
    +
    +    /**
    +     * Connects to the websocket-endpoint over an MQTT client.
    +     * @param context processcontext
    +     * @return websocket connection client
    +     */
    +    MqttWebSocketAsyncClient connect(ProcessContext context) {
    +        getCredentialsProvider(context).refresh();
    +        AWSCredentials awsCredentials = getCredentialsProvider(context).getCredentials();
    +        MqttWebSocketAsyncClient _mqttClient = null;
    +
    +        // generate mqtt endpoint-address with authentication details
    +        String strEndpointAddress;
    +        try {
    +            strEndpointAddress = AWS4Signer.getAddress(awsRegion, awsEndpoint, awsCredentials);
    +        } catch (Exception e) {
    +            getLogger().error("Error while generating AWS endpoint-address caused by " + e.getMessage());
    +            return null;
    +        }
    +        // extend clientId with random string in order to ensure unique id per connection
    +        String clientId = awsClientId + RandomStringUtils.random(12, true, false);
    --- End diff --
    
    The processor subscribing to a topic needs two clientIds as for QoS 1 it opens another after closing the first websocket connection. That said I have to append the user's clientId with something unique or I have to set up a second property and ask the user for a second clientId. Maybe the appendix should not have 12 characters but only switch between "-1" and "-2". What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65421271
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    --- End diff --
    
    Changed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/349


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65423497
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoT.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.aws.iot.util.IoTMessage;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.LinkedList;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Get", "Subscribe", "Receive"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Subscribes to and receives messages from MQTT-topic(s) of AWS IoT." +
    +    "The processor keeps open a WebSocket connection and will automatically renew the " +
    +    "connection to overcome Amazon's service limit on maximum connection duration. Depending on " +
    +    "your set up QoS the processor will miss some messages (QoS=0) or receives messages twice (QoS=1) " +
    +    "while reconnecting to AWS IoT WebSocket endpoint. We strongly recommend you to make use of " +
    +    "processor isolation as concurrent subscriptions to an MQTT topic result in multiple message receiptions.")
    +@SeeAlso({ GetAWSIoTShadow.class })
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.endpoint", description = "AWS endpoint this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.topic", description = "MQTT topic this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.client", description = "MQTT client which received the message."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.qos", description = "Underlying MQTT quality-of-service.")
    +})
    +public class GetAWSIoT extends AbstractAWSIoTProcessor {
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        // init to build up mqtt connection over web-sockets
    +        init(context);
    +        if (mqttClient != null && mqttClient.isConnected()) {
    +            try {
    +                // subscribe to topic with configured qos in order to start receiving messages
    +                mqttClient.subscribe(awsTopic, awsQos);
    +            } catch (MqttException e) {
    +                getLogger().error("Error while subscribing to topic " + awsTopic + " with client-id " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final List messageList = new LinkedList();
    +        // check if connection is about to terminate
    +        if (isConnectionAboutToExpire()) {
    +            MqttWebSocketAsyncClient _mqttClient = null;
    +            try {
    +                // before subscribing to the topic with new connection first unsubscribe
    +                // old connection from same topic if subscription is set to QoS 0
    +                if (awsQos == 0) mqttClient.unsubscribe(awsTopic);
    +                // establish a second connection
    +                _mqttClient = connect(context);
    +                // now subscribe to topic with new connection
    +                _mqttClient.subscribe(awsTopic, awsQos);
    +                // between re-subscription and disconnect from old connection
    +                // QoS=0 subscription eventually lose some messages
    +                // QoS=1 subscription eventually receive some messages twice
    +                // now terminate old connection
    +                mqttClient.disconnect();
    +            } catch (MqttException e) {
    +                getLogger().error("Error while renewing connection with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            } finally {
    +                // grab messages left over from old connection
    +                mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    +                // now set the new connection as the default connection
    +                if (_mqttClient != null) mqttClient = _mqttClient;
    +            }
    +        } else {
    +            // grab messages which queued up since last run
    +            mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    --- End diff --
    
    You're right. Changed it so it is now peeking messages from the top of the original list of messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65420217
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/IoTMessage.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot.util;
    +
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +
    +public class IoTMessage {
    +    private final String topic;
    +    private final byte[] payload;
    +    private final Integer qos;
    +
    +    public IoTMessage(MqttMessage message, String topic) {
    --- End diff --
    
    MqttMessage also offers the "isDuplicate" and "isRetained" methods. Is there a reason for not including these as part of IoTMessage and passing onto the FlowFile as attributes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-214779318
  
    I am on board with those names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65420897
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    --- End diff --
    
    This check for a flowfile should be before the reconnect. There is no point to reconnect if there is no FlowFile to process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65413629
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    --- End diff --
    
    Is there any static variable in the AWS libraries that could be referred to here? Don't want to worry about updating this every time there is a new version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    @KayLerch do you think you'll have time to come back to this?  I'm inclined to close this as part of a stale PR sweep but wanted to check with you.  Totally understand if so.  And if you can't do it soon then perhaps you can close it and try again later?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/349
  
    Yup I will start an in-depth review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65385897
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Since the mqttClient is working async, there is the chance that the client will attempt to publish, the processor will then transfer the flowfile and then the publish will fail, leading to data loss (acceptable with QoS 0 but not 1). The best way to handle this is for QoS 1 to wait until the message is delivered, like the default Paho MQTT client does: 
    
    https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java#L361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    The more I type out and try to easily distinguish GetAWSIoT and PutAWSIoT the more I think MQTT should be integrated into the name and all four of the processors should be Publish/Consume. 
    
    The GetAWSIoT and PutAWSIoT don't support all the AWS IoT protocols (don't support "HTTP") and only support MQTT and MQTT over websockets. The name should convey that they are specifically MQTT processors.
    
    In NiFi, we have a couple different naming conventions. Get/Put are for processors that do individual requests to some endpoint. For example GetFile and PutFile, they don't have any lasting connections and each time get or put a file. Publish and Consume, in general, are for processors which are listening/putting to a topic. For example the PublishKafka and ConsumeKafka processors publish and subscribe to a Kafka topic (it's "consume" instead "subscribe" because we're actually doing something with the messages and not just "subscribing" to a topic, there was a whole big thread about it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    @JPercivall : I started to work on the issues but did not finish. Have a lot of going on at the moment and no time in the next three weeks to finish my work :-( 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-214773915
  
    @KayLerch I definitely appreciate the decision to follow the established naming convention but I'm for breaking the convention for these processors, where the name is not unique. 
    
    If anyone else has a dissenting opinion feel free to comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1767 AWS IoT processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/349
  
    @JPercivall you by chance have bandwidth to keep progressing this with @KayLerch ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65418441
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open after automatically renewing it. " +
    +                    "This is neccessary due to Amazon's service limit on WebSocket connection duration. " +
    +                    "As soon as the limit is changed by Amazon you can adjust the value here. Never use " +
    +                    "a duration longer than supported by Amazon. This processor renews the connection " +
    +                    "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " +
    +                    "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_TOPIC)
    +            .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_QOS)
    +            .description("Decide for at most once (0) or at least once (1) message-receiption. " +
    +                    "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " +
    +                    "is " + DEFAULT_QOS + ".")
    +            .required(false)
    +            .allowableValues("0", "1")
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * Create client using credentials provider. This is the preferred way for creating clients
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials provider ");
    +        // Actually this client is not needed. However, it is initialized due to the pattern of
    +        // AbstractAWSCredentialsProviderProcessor
    +        return new AWSIotClient(credentialsProvider, config);
    +    }
    +
    +    /**
    +     * Create client using AWSCredentails
    +     *
    +     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials ");
    +        // Actually this client is not needed. it is initialized due to the pattern of
    +        // AbstractAWSProcessor
    +        return new AWSIotClient(credentials, config);
    +    }
    +
    +    /**
    +     * Gets ready an MQTT client by connecting to a AWS IoT WebSocket endpoint specific to the properties
    +     * @param context processor context
    +     */
    +    void init(final ProcessContext context) {
    +        // read out properties
    +        awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue();
    +        awsRegion = context.getProperty(REGION).getValue();
    +        awsClientId = context.getProperty(PROP_CLIENT).isSet() ? context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT;
    +        awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() ? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE;
    +        awsTopic = context.getProperty(PROP_TOPIC).getValue();
    +        awsQos = context.getProperty(PROP_QOS).isSet() ? context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS;
    +        // initialize and connect to mqtt endpoint
    +        mqttClient = connect(context);
    +    }
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) {
    +        try {
    +            mqttClient.disconnect();
    +        } catch (MqttException me) {
    +            getLogger().warn("MQTT " + me.getMessage());
    +        }
    +        getLogger().info("Disconnected");
    +    }
    +
    +    /**
    +     * Returns the lifetime-seconds of the established websocket-connection
    +     * @return seconds
    +     */
    +    long getConnectionDuration() {
    +        return dtLastConnect != null
    +                ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - dtLastConnect.getTime())
    +                : awsKeepAliveSeconds + 1;
    +    }
    +
    +    /**
    +     * In seconds get the remaining lifetime of the connection. It is not the actual time to
    +     * expiration but an advice to when it is worth renewing the connection.
    +     * @return seconds
    +     */
    +    long getRemainingConnectionLifetime() {
    +        return awsKeepAliveSeconds - DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION;
    +    }
    +
    +    /**
    +     * Indicates if WebSocket connection is about to expire. It gives the caller an advice
    +     * to renew the connection some time before the actual expiration.
    +     * @return Indication (if true caller should renew the connection)
    +     */
    +    boolean isConnectionAboutToExpire() {
    +        return getConnectionDuration() > getRemainingConnectionLifetime();
    +    }
    +
    +    /**
    +     * Connects to the websocket-endpoint over an MQTT client.
    +     * @param context processcontext
    +     * @return websocket connection client
    +     */
    +    MqttWebSocketAsyncClient connect(ProcessContext context) {
    --- End diff --
    
    The beginning portion of this method re-creates multiple different local variables to use in reconnecting. Maybe persist anything can be re-used (like the connections options).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65422291
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Really good point. Wasn't aware of this. There are some more async mqtt actions which didn't waitForCompletion. Fixed all of them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65422534
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().send(flowFile, awsEndpoint + "(" + awsClientId + ")");
    +        } catch (MqttException e) {
    +            getLogger().error("Error while initially subscribing to topics with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    --- End diff --
    
    collateral damage of pasting code ;) changed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    Currently the only tests are integration tests that rely on an AWS instance. I would like to see more true unit tests. For inspiration, check out the Publish/Consume MQTT unit tests[1].
    
    [1] https://github.com/apache/nifi/tree/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    The Get/PutAWSIoT processors read and overwrite mqttClient many times without a read/write lock, this can lead to problems in multi-threaded scenarios. For example, the both processors call "AbstractAWSIoTProcessor.connect()" when the connection is about to expire. When a processor is running with more than one thread, multiple will potentially enter that method and attempt to reconnect the client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65425287
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open after automatically renewing it. " +
    +                    "This is neccessary due to Amazon's service limit on WebSocket connection duration. " +
    +                    "As soon as the limit is changed by Amazon you can adjust the value here. Never use " +
    +                    "a duration longer than supported by Amazon. This processor renews the connection " +
    +                    "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " +
    +                    "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_TOPIC)
    +            .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_QOS)
    +            .description("Decide for at most once (0) or at least once (1) message-receiption. " +
    +                    "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " +
    +                    "is " + DEFAULT_QOS + ".")
    +            .required(false)
    +            .allowableValues("0", "1")
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * Create client using credentials provider. This is the preferred way for creating clients
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials provider ");
    +        // Actually this client is not needed. However, it is initialized due to the pattern of
    +        // AbstractAWSCredentialsProviderProcessor
    +        return new AWSIotClient(credentialsProvider, config);
    +    }
    +
    +    /**
    +     * Create client using AWSCredentails
    +     *
    +     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials ");
    +        // Actually this client is not needed. it is initialized due to the pattern of
    +        // AbstractAWSProcessor
    +        return new AWSIotClient(credentials, config);
    +    }
    +
    +    /**
    +     * Gets ready an MQTT client by connecting to a AWS IoT WebSocket endpoint specific to the properties
    +     * @param context processor context
    +     */
    +    void init(final ProcessContext context) {
    +        // read out properties
    +        awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue();
    +        awsRegion = context.getProperty(REGION).getValue();
    +        awsClientId = context.getProperty(PROP_CLIENT).isSet() ? context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT;
    +        awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() ? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE;
    +        awsTopic = context.getProperty(PROP_TOPIC).getValue();
    +        awsQos = context.getProperty(PROP_QOS).isSet() ? context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS;
    +        // initialize and connect to mqtt endpoint
    +        mqttClient = connect(context);
    +    }
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) {
    +        try {
    +            mqttClient.disconnect();
    +        } catch (MqttException me) {
    +            getLogger().warn("MQTT " + me.getMessage());
    +        }
    +        getLogger().info("Disconnected");
    +    }
    +
    +    /**
    +     * Returns the lifetime-seconds of the established websocket-connection
    +     * @return seconds
    +     */
    +    long getConnectionDuration() {
    +        return dtLastConnect != null
    +                ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - dtLastConnect.getTime())
    +                : awsKeepAliveSeconds + 1;
    +    }
    +
    +    /**
    +     * In seconds get the remaining lifetime of the connection. It is not the actual time to
    +     * expiration but an advice to when it is worth renewing the connection.
    +     * @return seconds
    +     */
    +    long getRemainingConnectionLifetime() {
    +        return awsKeepAliveSeconds - DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION;
    +    }
    +
    +    /**
    +     * Indicates if WebSocket connection is about to expire. It gives the caller an advice
    +     * to renew the connection some time before the actual expiration.
    +     * @return Indication (if true caller should renew the connection)
    +     */
    +    boolean isConnectionAboutToExpire() {
    +        return getConnectionDuration() > getRemainingConnectionLifetime();
    +    }
    +
    +    /**
    +     * Connects to the websocket-endpoint over an MQTT client.
    +     * @param context processcontext
    +     * @return websocket connection client
    +     */
    +    MqttWebSocketAsyncClient connect(ProcessContext context) {
    +        getCredentialsProvider(context).refresh();
    +        AWSCredentials awsCredentials = getCredentialsProvider(context).getCredentials();
    +        MqttWebSocketAsyncClient _mqttClient = null;
    +
    +        // generate mqtt endpoint-address with authentication details
    +        String strEndpointAddress;
    +        try {
    +            strEndpointAddress = AWS4Signer.getAddress(awsRegion, awsEndpoint, awsCredentials);
    +        } catch (Exception e) {
    +            getLogger().error("Error while generating AWS endpoint-address caused by " + e.getMessage());
    +            return null;
    +        }
    +        // extend clientId with random string in order to ensure unique id per connection
    +        String clientId = awsClientId + RandomStringUtils.random(12, true, false);
    --- End diff --
    
    Hmmm, that is tricky/annoying there isn't a better way to handle reconnecting the same connection/clientId without losing messages.
    
    If there will only ever be two different connections at a time then I would vote for switching between appending "-1" and "-2". This will give better protection against having multiple connections with the same base clientId. Also be sure to mention this in the description of the "clientId" property.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65419729
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoT.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.aws.iot.util.IoTMessage;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.LinkedList;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Get", "Subscribe", "Receive"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Subscribes to and receives messages from MQTT-topic(s) of AWS IoT." +
    +    "The processor keeps open a WebSocket connection and will automatically renew the " +
    +    "connection to overcome Amazon's service limit on maximum connection duration. Depending on " +
    +    "your set up QoS the processor will miss some messages (QoS=0) or receives messages twice (QoS=1) " +
    +    "while reconnecting to AWS IoT WebSocket endpoint. We strongly recommend you to make use of " +
    +    "processor isolation as concurrent subscriptions to an MQTT topic result in multiple message receiptions.")
    +@SeeAlso({ GetAWSIoTShadow.class })
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.endpoint", description = "AWS endpoint this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.topic", description = "MQTT topic this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.client", description = "MQTT client which received the message."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.qos", description = "Underlying MQTT quality-of-service.")
    +})
    +public class GetAWSIoT extends AbstractAWSIoTProcessor {
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        // init to build up mqtt connection over web-sockets
    +        init(context);
    +        if (mqttClient != null && mqttClient.isConnected()) {
    +            try {
    +                // subscribe to topic with configured qos in order to start receiving messages
    +                mqttClient.subscribe(awsTopic, awsQos);
    +            } catch (MqttException e) {
    +                getLogger().error("Error while subscribing to topic " + awsTopic + " with client-id " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final List messageList = new LinkedList();
    +        // check if connection is about to terminate
    +        if (isConnectionAboutToExpire()) {
    +            MqttWebSocketAsyncClient _mqttClient = null;
    +            try {
    +                // before subscribing to the topic with new connection first unsubscribe
    +                // old connection from same topic if subscription is set to QoS 0
    +                if (awsQos == 0) mqttClient.unsubscribe(awsTopic);
    +                // establish a second connection
    +                _mqttClient = connect(context);
    +                // now subscribe to topic with new connection
    +                _mqttClient.subscribe(awsTopic, awsQos);
    +                // between re-subscription and disconnect from old connection
    +                // QoS=0 subscription eventually lose some messages
    +                // QoS=1 subscription eventually receive some messages twice
    +                // now terminate old connection
    +                mqttClient.disconnect();
    +            } catch (MqttException e) {
    +                getLogger().error("Error while renewing connection with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            } finally {
    +                // grab messages left over from old connection
    +                mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    +                // now set the new connection as the default connection
    +                if (_mqttClient != null) mqttClient = _mqttClient;
    --- End diff --
    
    Line 111 (_mqttClient.subscribe) can throw an MqttException which would lead to _mqttClient being initialized and connected but not subscribed. Then on this line it would replace mqttClient with _mqttClient. There should be a check to make sure the it successfully subscribed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    A couple places have weird comments like "// @TRACE 252=connect to host {0} port {1} timeout {2}". Is this a convention I'm not aware of or left over from copy/paste?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-214775313
  
    I agree with the point @KayLerch made.  This processor is not just mqtt but is mqtt & aws so i think his naming decision makes sense.  Good to talk about the consistency approach though 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-209646256
  
    Hello @KayLerch .  Really cool contribution and awesome start.  We'll want to make sure there is an open Apache NiFi JIRA for this and you are more than welcome to create that.  We'll need the commits to reference that JIRA.  Take a look here for some details to help the review and follow-on processes go smoothly.  https://cwiki.apache.org/confluence/display/NIFI/Contributor+Guide
    
    Look forward to helping you get this through the review process and included.  Your PDF writeup of the requirement and your approach is very nicely done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-209656778
  
    Hi Joe,
    thanks for helping me out. I just opened an issue in JIRA => [NIFI-1767](https://issues.apache.org/jira/browse/NIFI-1767)
    
    Looking forward to getting this through :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    @joewitt Hope you're fine. Sorry for not having my work completed. It's been a busy time.
    @trixpan thanks a lot for taking this over. You're right: AWS released a Java Client SDK for IoT shortly after I submitted the PR. I agree on rather building on top of this than going forward with the custom implementation. However, AWSIoT processors for NiFi would be a big win. I am looking forward to using it.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    Does AWS IoT support communicating over SSL with MQTT or shadows? If so, that option should be added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65387960
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().send(flowFile, awsEndpoint + "(" + awsClientId + ")");
    +        } catch (MqttException e) {
    +            getLogger().error("Error while initially subscribing to topics with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            flowFile = session.putAttribute(flowFile, ATTR_NAME_EXCEPTION, e.getMessage());
    --- End diff --
    
    Is there a specific reason/use-case you did this? I've never seen a processor add an attribute for an exception message before


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65388471
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().send(flowFile, awsEndpoint + "(" + awsClientId + ")");
    +        } catch (MqttException e) {
    +            getLogger().error("Error while initially subscribing to topics with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            flowFile = session.putAttribute(flowFile, ATTR_NAME_EXCEPTION, e.getMessage());
    --- End diff --
    
    i would recommend avoiding that pattern.  Especially in a case like this where the flow file isn't the problem.  This approach is a sort of convenience to make that information available to the user.  That is what bulletins are for so simply using the logger and error level will take care of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65423267
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot.util;
    +
    +import java.net.URI;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.TimerPingSender;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttSecurityException;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +public class MqttWebSocketAsyncClient extends MqttAsyncClient implements MqttCallback {
    +
    +    protected volatile LinkedBlockingQueue<IoTMessage> awsQueuedMqttMessages = new LinkedBlockingQueue<IoTMessage>();
    +    protected final ProcessorLog logger;
    +    protected final String serverURI;
    +
    +    protected static String createDummyURI(String original) {
    +        if (!original.startsWith("ws:") && !original.startsWith("wss:")) {
    +            return original;
    +        }
    +        final URI uri = URI.create(original);
    +        return "tcp://DUMMY-" + uri.getHost() + ":"
    +                + (uri.getPort() > 0 ? uri.getPort() : 80);
    +    }
    +
    +    protected static boolean isDummyURI(String uri) {
    +        return uri.startsWith("tcp://DUMMY-");
    +    }
    +
    +    public MqttWebSocketAsyncClient(String serverURI, String clientId,
    +                                    ProcessorLog logger) throws MqttException {
    +        super(createDummyURI(serverURI), clientId, new MemoryPersistence(), new TimerPingSender());
    +        this.serverURI = serverURI;
    +        this.logger = logger;
    +        this.setCallback(this);
    +    }
    +
    +    @Override
    +    protected NetworkModule[] createNetworkModules(String address,
    +                                                   MqttConnectOptions options) throws MqttException{
    +        String[] serverURIs = options.getServerURIs();
    +        String[] array = serverURIs == null ? new String[] { address } :
    +            serverURIs.length == 0 ? new String[] { address }: serverURIs;
    +
    +        NetworkModule[] networkModules = new NetworkModule[array.length];
    +        for (int i = 0; i < array.length; i++) {
    +            networkModules[i] = createNetworkModule(array[i], options);
    +        }
    +        return networkModules;
    +    }
    +
    +    protected NetworkModule createNetworkModule(String input,
    +                                                MqttConnectOptions options) throws MqttException,
    +            MqttSecurityException {
    +        final String address = isDummyURI(input) ? this.serverURI : input;
    +        if (!address.startsWith("ws:") && !address.startsWith("wss:")) {
    +            return super.createNetworkModules(address, options)[0];
    +        }
    +
    +        final String subProtocol = (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_3_1) ? "mqttv3.1" : "mqtt";
    +        return newWebSocketNetworkModule(URI.create(address), subProtocol, options);
    +    }
    +
    +    protected NetworkModule newWebSocketNetworkModule(URI uri,
    +                                                      String subProtocol, MqttConnectOptions options) {
    +        final WebSocketNetworkModule netModule = new WebSocketNetworkModule(
    +                uri, subProtocol, getClientId());
    +        netModule.setConnectTimeout(options.getConnectionTimeout());
    +        return netModule;
    +    }
    +
    +    public LinkedBlockingQueue<IoTMessage> getAwsQueuedMqttMessages() {
    +        return awsQueuedMqttMessages;
    +    }
    +
    +    @Override
    +    public void connectionLost(Throwable t) {
    +        logger.error("Connection to " + this.getServerURI() + " lost with cause: " + t.getMessage());
    +    }
    +
    +    @Override
    +    public void deliveryComplete(IMqttDeliveryToken token) {
    +    }
    +
    +    @Override
    +    public void messageArrived(String topic, MqttMessage message) throws Exception {
    +        logger.info("Message arrived from topic: " + topic);
    --- End diff --
    
    Changed it to debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    A bit nit picky and I may be alone here, but I find putting "if" statements and their content on the same line a bit hard to read when trying to read someone else's code. For example line 107 of GetAWSIoT[1]. When reviewing it's nice to distinctly see "if" statements condition and content. 
    
    I would prefer the bit more verbose but easier consume longer format:
    `if (awsQos == 0) {
        mqttClient.unsubscribe(awsTopic);
    }'
    
    [1] https://github.com/apache/nifi/blob/d46b01c7edf9089a944c3fc1c71b046cd19a04c2/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoT.java#L107-L107


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65387577
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().send(flowFile, awsEndpoint + "(" + awsClientId + ")");
    +        } catch (MqttException e) {
    +            getLogger().error("Error while initially subscribing to topics with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    --- End diff --
    
    Code is in the PutAWSIoT processor but message says "Error while initially subscribing..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65422907
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED;
    +        init(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        // check if MQTT-connection is about to expire
    +        if (isConnectionAboutToExpire()) {
    +            // renew connection
    +            mqttClient = connect(context);
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        // if provided override MQTT configuration with values from the corresponding message attributes
    +        String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic;
    +        Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos;
    +        Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain;
    +        // get message content
    +        final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream();
    +        session.exportTo(flowFile, fileContentStream);
    +
    +        try {
    +            // publish messages to mqtt-topic(s)
    +            mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained);
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().send(flowFile, awsEndpoint + "(" + awsClientId + ")");
    +        } catch (MqttException e) {
    +            getLogger().error("Error while initially subscribing to topics with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            flowFile = session.putAttribute(flowFile, ATTR_NAME_EXCEPTION, e.getMessage());
    --- End diff --
    
    I swear I saw it somewhere and thought it is a practice of exposing errors in NiFi. Actually this is not a good pattern at all. Removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-217645675
  
    @joewitt @JPercivall : So I renamed the processors to
    GetAWSIoT                
    PutAWSIoT                 
    GetAWSIoTShadow
    PutAWSIoTShadow
    
    @apiri : I resolved several check-style warnings and successfully made a clean install with contrib-check. Should work now.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65420622
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoTShadow.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.services.iotdata.AWSIotDataClient;
    +import com.amazonaws.services.iotdata.model.UpdateThingShadowRequest;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +
    +@Tags({"Amazon", "AWS", "IOT", "Shadow", "Put", "Update", "Write"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Sets state of a thing in AWS IoT by updating the shadow. You can dynamically set a thing-name " +
    +        "when overriding the processor-configuration with a message-attribute \"aws.iot.thing.override\".")
    +@SeeAlso({ PutAWSIoT.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.thing.override", description = "Overrides the processor configuration for topic."),
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.thing", description = "Underlying MQTT quality-of-service.")
    +})
    +public class PutAWSIoTShadow extends AbstractAWSIoTShadowProcessor {
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_THING,
    +                    TIMEOUT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    PROXY_HOST,
    +                    PROXY_HOST_PORT,
    +                    REGION));
    +
    +    private final static String ATTR_NAME_THING = PROP_NAME_THING + ".override";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    --- End diff --
    
    Only the success relationship is exposed, should also include the failure relationship.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65418924
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoT.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.aws.iot.util.IoTMessage;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.LinkedList;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Get", "Subscribe", "Receive"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Subscribes to and receives messages from MQTT-topic(s) of AWS IoT." +
    +    "The processor keeps open a WebSocket connection and will automatically renew the " +
    +    "connection to overcome Amazon's service limit on maximum connection duration. Depending on " +
    +    "your set up QoS the processor will miss some messages (QoS=0) or receives messages twice (QoS=1) " +
    +    "while reconnecting to AWS IoT WebSocket endpoint. We strongly recommend you to make use of " +
    +    "processor isolation as concurrent subscriptions to an MQTT topic result in multiple message receiptions.")
    +@SeeAlso({ GetAWSIoTShadow.class })
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.endpoint", description = "AWS endpoint this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.topic", description = "MQTT topic this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.client", description = "MQTT client which received the message."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.qos", description = "Underlying MQTT quality-of-service.")
    +})
    +public class GetAWSIoT extends AbstractAWSIoTProcessor {
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        // init to build up mqtt connection over web-sockets
    +        init(context);
    +        if (mqttClient != null && mqttClient.isConnected()) {
    +            try {
    +                // subscribe to topic with configured qos in order to start receiving messages
    +                mqttClient.subscribe(awsTopic, awsQos);
    +            } catch (MqttException e) {
    +                getLogger().error("Error while subscribing to topic " + awsTopic + " with client-id " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final List messageList = new LinkedList();
    +        // check if connection is about to terminate
    +        if (isConnectionAboutToExpire()) {
    +            MqttWebSocketAsyncClient _mqttClient = null;
    +            try {
    +                // before subscribing to the topic with new connection first unsubscribe
    +                // old connection from same topic if subscription is set to QoS 0
    +                if (awsQos == 0) mqttClient.unsubscribe(awsTopic);
    --- End diff --
    
    The client is not unsubscribed first when using QoS 1 but is when using QoS 0. Are there any benefits to unsubscribing first?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65424552
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    --- End diff --
    
    The names of the properties should be meaningful to all users and then more details should be in the description. These prop names are very ambiguous and overly technical looking for the typical user. If we keep the current way of overriding the properties (commented on that else where), these attribute names should be in the description of the property instead of the name.
    
    Related, do these "technical" names have value outside of the specific NiFi attributes? For example, in the Kafka processors there are properties which directly correlate to properties used elsewhere with similarly technical names and it's important to let users know that. If so the correlation should be noted in the description.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65411771
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoT.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.aws.iot.util.IoTMessage;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.LinkedList;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Get", "Subscribe", "Receive"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Subscribes to and receives messages from MQTT-topic(s) of AWS IoT." +
    +    "The processor keeps open a WebSocket connection and will automatically renew the " +
    +    "connection to overcome Amazon's service limit on maximum connection duration. Depending on " +
    +    "your set up QoS the processor will miss some messages (QoS=0) or receives messages twice (QoS=1) " +
    +    "while reconnecting to AWS IoT WebSocket endpoint. We strongly recommend you to make use of " +
    +    "processor isolation as concurrent subscriptions to an MQTT topic result in multiple message receiptions.")
    +@SeeAlso({ GetAWSIoTShadow.class })
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.endpoint", description = "AWS endpoint this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.topic", description = "MQTT topic this message was received from."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.client", description = "MQTT client which received the message."),
    +        @WritesAttribute(attribute = "aws.iot.mqtt.qos", description = "Underlying MQTT quality-of-service.")
    +})
    +public class GetAWSIoT extends AbstractAWSIoTProcessor {
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        // init to build up mqtt connection over web-sockets
    +        init(context);
    +        if (mqttClient != null && mqttClient.isConnected()) {
    +            try {
    +                // subscribe to topic with configured qos in order to start receiving messages
    +                mqttClient.subscribe(awsTopic, awsQos);
    +            } catch (MqttException e) {
    +                getLogger().error("Error while subscribing to topic " + awsTopic + " with client-id " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final List messageList = new LinkedList();
    +        // check if connection is about to terminate
    +        if (isConnectionAboutToExpire()) {
    +            MqttWebSocketAsyncClient _mqttClient = null;
    +            try {
    +                // before subscribing to the topic with new connection first unsubscribe
    +                // old connection from same topic if subscription is set to QoS 0
    +                if (awsQos == 0) mqttClient.unsubscribe(awsTopic);
    +                // establish a second connection
    +                _mqttClient = connect(context);
    +                // now subscribe to topic with new connection
    +                _mqttClient.subscribe(awsTopic, awsQos);
    +                // between re-subscription and disconnect from old connection
    +                // QoS=0 subscription eventually lose some messages
    +                // QoS=1 subscription eventually receive some messages twice
    +                // now terminate old connection
    +                mqttClient.disconnect();
    +            } catch (MqttException e) {
    +                getLogger().error("Error while renewing connection with client " + mqttClient.getClientId() + " caused by " + e.getMessage());
    +            } finally {
    +                // grab messages left over from old connection
    +                mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    +                // now set the new connection as the default connection
    +                if (_mqttClient != null) mqttClient = _mqttClient;
    +            }
    +        } else {
    +            // grab messages which queued up since last run
    +            mqttClient.getAwsQueuedMqttMessages().drainTo(messageList);
    --- End diff --
    
    What is the benefit of draining the queue into the local messageList? One distinct problem I see is after the queue is drained into the local variable `messageList` and an exception occurs, all messages remaining in the queue will be lost.
    
    What I would suggest doing is to peek at the top of the queue and then remove it only after it has been transferred successfully, like in ConsumeMQTT[1]
    
    [1] https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L268


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-214464542
  
    @KayLerch this bundle looks awesome! 
    
    One thing, I would prefer to rename the processors to have AWS in their name. We will more than likely also have general MQTT processors unrelated to AWS and the names may be confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65412507
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot.util;
    +
    +import java.net.URI;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.TimerPingSender;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttSecurityException;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +public class MqttWebSocketAsyncClient extends MqttAsyncClient implements MqttCallback {
    +
    +    protected volatile LinkedBlockingQueue<IoTMessage> awsQueuedMqttMessages = new LinkedBlockingQueue<IoTMessage>();
    +    protected final ProcessorLog logger;
    +    protected final String serverURI;
    +
    +    protected static String createDummyURI(String original) {
    --- End diff --
    
    What is the point of this dummyURI?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65420794
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration. Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers registering " +
    +                    "a new subscription, the flag being true indicates that the received message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    --- End diff --
    
    Only the success relationship is exposed, should also include the failure relationship.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65421228
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open after automatically renewing it. " +
    +                    "This is neccessary due to Amazon's service limit on WebSocket connection duration. " +
    +                    "As soon as the limit is changed by Amazon you can adjust the value here. Never use " +
    +                    "a duration longer than supported by Amazon. This processor renews the connection " +
    +                    "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " +
    +                    "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_TOPIC)
    +            .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_QOS)
    +            .description("Decide for at most once (0) or at least once (1) message-receiption. " +
    +                    "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " +
    +                    "is " + DEFAULT_QOS + ".")
    +            .required(false)
    +            .allowableValues("0", "1")
    +            .defaultValue("0")
    --- End diff --
    
    Have it. Just forgot ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    I actually already committed general MQTT processors [1] and they have some good "best practices" with regards to publishing/consuming messages and MQTT I think would be good to follow here. The first few I have are below:
    
    First in regards to the internal queue of messages, there needs to be some kind of limit on it. If a user doesn't set a fast enough trigger duration the queue could become enormous and, since all the messages are held in memory, potentially bringing NiFi to a halt. Here is the property I add to ConsumeMQTT[2].
    
    This leads to the second, what to do with the internal queue when the processor is stopped. There will potentially be some data left in the internal queue when the processor is stopped and if that data is not handled it will more than likely be lost. In the OnStopped I actually transfer all messages in the queue (just like is done int he OnTrigger)[3].
    
    In the PutAWSIoT processor, you read the attributes from the FlowFile to essentially overwrite Processor defined properties. Being able to have different QoS/Topics/Retained different messages is very important and the most user friendly way to do this is through Expression Language. Check out how I do it here[4]. This gives the user to potentially dynamically select the QoS/Topic/Retained and doesn't do things automatically for the user. 
    
    [1] https://github.com/apache/nifi/tree/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt
    [2] https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L111
    [3] https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L233
    [4] https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java#L63


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65416412
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open after automatically renewing it. " +
    --- End diff --
    
    Little confusing, maybe should read "The number of seconds a WebSocket-connection..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65423265
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.services.iotdata.AWSIotDataClient;
    +import com.amazonaws.services.iotdata.model.GetThingShadowRequest;
    +import com.amazonaws.services.iotdata.model.GetThingShadowResult;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +
    +@Tags({"Amazon", "AWS", "IOT", "Shadow", "Get"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("Gets last persisted state of a thing in AWS IoT by reading out the shadow. " +
    +        "A shadow might change more often than you get triggered. In order to get every message send " +
    +        "out by a thing you better use GetAWSIoT processor. You can dynamically set a thing-name " +
    +        "when overriding the processor-configuration with a message-attribute \"aws.iot.thing.override\".")
    +@SeeAlso({ GetAWSIoT.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.thing.override", description = "Overrides the processor configuration for topic."),
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.thing", description = "Thing name in AWS IoT"),
    +})
    +public class GetAWSIoTShadow extends AbstractAWSIoTShadowProcessor {
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_THING,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    PROXY_HOST,
    +                    PROXY_HOST_PORT,
    +                    REGION));
    +
    +    private final static String ATTR_NAME_THING = PROP_NAME_THING + ".override";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        final AWSIotDataClient iotClient = this.getClient();
    +
    +        if (iotClient == null) {
    +            getLogger().error("AWS-Client was not initialized. See logs to find reasons.");
    +            return;
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        // if provided override configured thing name with name from corresponding message attribute
    +        String thingName = flowFile != null && flowFile.getAttributes().containsKey(ATTR_NAME_THING)
    +                        ? flowFile.getAttribute(ATTR_NAME_THING)
    +                        : context.getProperty(PROP_NAME_THING).getValue();
    +
    +        // ask shadow of the thing for last reported state by requesting the API of AWS
    +        final GetThingShadowRequest iotRequest = new GetThingShadowRequest().withThingName(thingName);
    +        final GetThingShadowResult iotResponse = iotClient.getThingShadow(iotRequest);
    +
    +        FlowFile flowFileOut = session.create();
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put(PROP_NAME_THING, thingName);
    +        flowFileOut = session.putAllAttributes(flowFileOut, attributes);
    +
    +        flowFileOut = session.write(flowFileOut, new OutputStreamCallback() {
    +            @Override
    +            public void process(final OutputStream out) throws IOException {
    +                out.write(iotResponse.getPayload().array());
    +            }
    +        });
    +        session.transfer(flowFileOut, REL_SUCCESS);
    +        session.commit();
    --- End diff --
    
    Should emit a "FETCH" provenance event when created using an incoming flowfile and "RECEIVE" when the processor is acting as a source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi AWS IoT processors

Posted by KayLerch <gi...@git.apache.org>.
Github user KayLerch commented on the pull request:

    https://github.com/apache/nifi/pull/349#issuecomment-210913144
  
    Live demo for **PutIOTMqtt**-processor ;)
    https://twitter.com/KayLerch/status/721455415456882689



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    Thabk Kay, would it be possible to  close this PR? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65374220
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in seconds before actual termination)
    +     */
    +    static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open after automatically renewing it. " +
    +                    "This is neccessary due to Amazon's service limit on WebSocket connection duration. " +
    +                    "As soon as the limit is changed by Amazon you can adjust the value here. Never use " +
    +                    "a duration longer than supported by Amazon. This processor renews the connection " +
    +                    "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " +
    +                    "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_TOPIC)
    +            .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_QOS)
    +            .description("Decide for at most once (0) or at least once (1) message-receiption. " +
    +                    "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " +
    +                    "is " + DEFAULT_QOS + ".")
    +            .required(false)
    +            .allowableValues("0", "1")
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * Create client using credentials provider. This is the preferred way for creating clients
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials provider ");
    +        // Actually this client is not needed. However, it is initialized due to the pattern of
    +        // AbstractAWSCredentialsProviderProcessor
    +        return new AWSIotClient(credentialsProvider, config);
    +    }
    +
    +    /**
    +     * Create client using AWSCredentails
    +     *
    +     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials ");
    +        // Actually this client is not needed. it is initialized due to the pattern of
    +        // AbstractAWSProcessor
    +        return new AWSIotClient(credentials, config);
    +    }
    +
    +    /**
    +     * Gets ready an MQTT client by connecting to a AWS IoT WebSocket endpoint specific to the properties
    +     * @param context processor context
    +     */
    +    void init(final ProcessContext context) {
    +        // read out properties
    +        awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue();
    +        awsRegion = context.getProperty(REGION).getValue();
    +        awsClientId = context.getProperty(PROP_CLIENT).isSet() ? context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT;
    +        awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() ? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE;
    +        awsTopic = context.getProperty(PROP_TOPIC).getValue();
    +        awsQos = context.getProperty(PROP_QOS).isSet() ? context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS;
    +        // initialize and connect to mqtt endpoint
    +        mqttClient = connect(context);
    +    }
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) {
    +        try {
    +            mqttClient.disconnect();
    +        } catch (MqttException me) {
    +            getLogger().warn("MQTT " + me.getMessage());
    +        }
    +        getLogger().info("Disconnected");
    +    }
    +
    +    /**
    +     * Returns the lifetime-seconds of the established websocket-connection
    +     * @return seconds
    +     */
    +    long getConnectionDuration() {
    +        return dtLastConnect != null
    +                ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - dtLastConnect.getTime())
    +                : awsKeepAliveSeconds + 1;
    +    }
    +
    +    /**
    +     * In seconds get the remaining lifetime of the connection. It is not the actual time to
    +     * expiration but an advice to when it is worth renewing the connection.
    +     * @return seconds
    +     */
    +    long getRemainingConnectionLifetime() {
    +        return awsKeepAliveSeconds - DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION;
    +    }
    +
    +    /**
    +     * Indicates if WebSocket connection is about to expire. It gives the caller an advice
    +     * to renew the connection some time before the actual expiration.
    +     * @return Indication (if true caller should renew the connection)
    +     */
    +    boolean isConnectionAboutToExpire() {
    +        return getConnectionDuration() > getRemainingConnectionLifetime();
    +    }
    +
    +    /**
    +     * Connects to the websocket-endpoint over an MQTT client.
    +     * @param context processcontext
    +     * @return websocket connection client
    +     */
    +    MqttWebSocketAsyncClient connect(ProcessContext context) {
    +        getCredentialsProvider(context).refresh();
    +        AWSCredentials awsCredentials = getCredentialsProvider(context).getCredentials();
    +        MqttWebSocketAsyncClient _mqttClient = null;
    +
    +        // generate mqtt endpoint-address with authentication details
    +        String strEndpointAddress;
    +        try {
    +            strEndpointAddress = AWS4Signer.getAddress(awsRegion, awsEndpoint, awsCredentials);
    +        } catch (Exception e) {
    +            getLogger().error("Error while generating AWS endpoint-address caused by " + e.getMessage());
    +            return null;
    +        }
    +        // extend clientId with random string in order to ensure unique id per connection
    +        String clientId = awsClientId + RandomStringUtils.random(12, true, false);
    --- End diff --
    
    I'm on the fence about appending a random string to the client ID to ensure uniqueness. This could lead to laziness when creating the clientId and making it so the user doesn't know where a connection came from (when looking at the AWS UI). Wouldn't it be better to not append the random string and instead force a user to create a better clientIds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    Hey @KayLerch we're starting to wrap up the 0.7.0 release, did you get a chance to address these concerns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/349
  
    @KayLerch unfortunate you won't be able to finish it for 0.7.0 but I'm looking forward to having it in 1.0.0! I'm going to change the status of the ticket from "Patch Available" to "In Progress".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65422861
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.services.iotdata.AWSIotDataClient;
    +import com.amazonaws.services.iotdata.model.GetThingShadowRequest;
    +import com.amazonaws.services.iotdata.model.GetThingShadowResult;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +
    +@Tags({"Amazon", "AWS", "IOT", "Shadow", "Get"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("Gets last persisted state of a thing in AWS IoT by reading out the shadow. " +
    +        "A shadow might change more often than you get triggered. In order to get every message send " +
    +        "out by a thing you better use GetAWSIoT processor. You can dynamically set a thing-name " +
    +        "when overriding the processor-configuration with a message-attribute \"aws.iot.thing.override\".")
    +@SeeAlso({ GetAWSIoT.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.thing.override", description = "Overrides the processor configuration for topic."),
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.thing", description = "Thing name in AWS IoT"),
    +})
    +public class GetAWSIoTShadow extends AbstractAWSIoTShadowProcessor {
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_THING,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    PROXY_HOST,
    +                    PROXY_HOST_PORT,
    +                    REGION));
    +
    +    private final static String ATTR_NAME_THING = PROP_NAME_THING + ".override";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    --- End diff --
    
    This onTrigger potentially gets a FlowFile and creates a new one. In this instance where the processor can act as a source processor but also accepts input the logic to cover all use-cases gets a little tricky. For an example, check out the InvokeHTTP processor[1], specifically the check to see if there is a non-loop connection when the flowfile is null.
    
    In addition to checking to see if the processor should run, you must handle both FlowFiles (transfer or remove). In this case you should probably have three relationships one for the original when it fails, one for the original when it succeeds and one for the created shadow when it succeeds. 
    
    [1] https://github.com/apache/nifi/blob/eb6f9f0fec3042350864afee17a67ea62ef5f37a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java#L527-L527


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---