You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/09/21 18:26:25 UTC

[1/5] ignite git commit: IGNITE-535 (WIP) Implement MQTT Streamer.

Repository: ignite
Updated Branches:
  refs/heads/master 421a5234b -> 88acd318b


IGNITE-535 (WIP) Implement MQTT Streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b53f1bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b53f1bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b53f1bb

Branch: refs/heads/master
Commit: 6b53f1bb2699ae7a6ea8ae277de1bfc3ecbd7b6a
Parents: b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:45:58 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 22:30:09 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            | 110 +++++++++
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 243 +++++++++++++++++++
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  50 ++++
 .../mqtt/IgniteMqttStreamerTestSuite.java       |  34 +++
 .../ignite/stream/mqtt/TestTupleExtractors.java |  28 +++
 pom.xml                                         |   1 +
 6 files changed, 466 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
new file mode 100644
index 0000000..b108180
--- /dev/null
+++ b/modules/mqtt/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-mqtt</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <paho.version>1.0.2</paho.version>
+        <mosquette.version>0.7</mosquette.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>${paho.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.moquette</groupId>
+            <artifactId>moquette-broker</artifactId>
+            <version>${mosquette.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
+    <repositories>
+        <repository>
+            <id>bintray</id>
+            <url>http://dl.bintray.com/andsel/maven/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>Eclipse Paho Repo</id>
+            <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
new file mode 100644
index 0000000..00a89ab
--- /dev/null
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * Streamer that consumes from a MQTT topic and feeds key-value pairs into an {@link IgniteDataStreamer} instance,
+ * using Eclipse Paho as an MQTT client.
+ * <p>
+ * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract
+ * cache tuples out of the incoming message.
+ * <p>
+ * This Streamer has many features:
+ *
+ * <ul>
+ *     <li>Subscribing to a single topic or multiple topics at once.</li>
+ *     <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
+ *     <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
+ *         sessions, etc.</li>
+ *     <li>Specifying the client ID.</li>
+ * </ul>
+ *
+ * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * {@link #connectOptions} property.
+ *
+ * @author Raul Kripalani
+ */
+public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    private MqttClient client;
+
+    private String brokerUrl;
+
+    private String topic;
+
+    private Integer qualityOfService;
+
+    private List<String> topics;
+
+    private List<Integer> qualitiesOfService;
+
+    /** Client ID in case we're using durable subscribers. */
+    private String clientId;
+
+    private MqttClientPersistence persistence;
+
+    private MqttConnectOptions connectOptions;
+
+    // disconnect parameters
+    private Integer disconnectQuiesceTimeout;
+
+    private boolean disconnectForcibly;
+
+    private Integer disconnectForciblyTimeout;
+
+    private volatile boolean stopped = true;
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() throws IgniteException {
+        if (!stopped)
+            throw new IgniteException("Attempted to start an already started MQTT Streamer");
+
+        // for simplicity, if these are null initialize to empty lists
+        topics = topics == null ? new ArrayList<String>() : topics;
+        qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
+
+        try {
+            // parameter validations
+            A.notNull(getStreamer(), "streamer");
+            A.notNull(getIgnite(), "ignite");
+            A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+            A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
+                "both single and multiple tuple extractor");
+            A.notNullOrEmpty(brokerUrl, "broker URL");
+            A.notNullOrEmpty(clientId, "client ID");
+
+            // if we have both a single topic and a list of topics, fail
+            if (topic != null && topic.length() > 0 && !topics.isEmpty())
+                throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
+
+            // if we have both a single QoS and list, fail
+            if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+                throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
+            }
+
+            // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
+            if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+                A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
+                    "with quiesce");
+            }
+
+            // if we have multiple topics
+            if (topics != null && !topics.isEmpty()) {
+                for (String t : topics) {
+                    A.notNullOrEmpty(t, "topic in list of topics");
+                }
+                A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
+                    "service must be either empty or have the same size as topics list");
+            }
+
+            // create logger
+            log = getIgnite().log();
+
+            // create the mqtt client
+            if (persistence == null)
+                client = new MqttClient(brokerUrl, clientId);
+            else
+                client = new MqttClient(brokerUrl, clientId, persistence);
+
+            connectAndSubscribe();
+
+            stopped = false;
+
+        }
+        catch (Throwable t) {
+            throw new IgniteException("Exception while initializing MqttStreamer", t);
+        }
+
+    }
+
+    private void connectAndSubscribe() throws MqttException {
+        // connect
+        if (connectOptions != null)
+            client.connect();
+        else
+            client.connect(connectOptions);
+
+        // subscribe to multiple topics
+        if (!topics.isEmpty()) {
+            if (qualitiesOfService.isEmpty()) {
+                client.subscribe(topics.toArray(new String[0]));
+            } else {
+                int[] qoses = new int[qualitiesOfService.size()];
+                for (int i = 0; i < qualitiesOfService.size(); i++)
+                    qoses[i] = qualitiesOfService.get(i);
+
+                client.subscribe(topics.toArray(new String[0]), qoses);
+            }
+        } else {
+            // subscribe to a single topic
+            if (qualityOfService == null) {
+                client.subscribe(topic);
+            } else {
+                client.subscribe(topic, qualityOfService);
+            }
+        }
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() throws IgniteException {
+        if (stopped)
+            throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+
+        try {
+            if (disconnectForcibly) {
+                if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+                    client.disconnectForcibly();
+                } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+                    client.disconnectForcibly(disconnectForciblyTimeout);
+                } else {
+                    client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
+                }
+            } else {
+                if (disconnectQuiesceTimeout == null) {
+                    client.disconnect();
+                } else {
+                    client.disconnect(disconnectQuiesceTimeout);
+                }
+            }
+        }
+        catch (Throwable t) {
+            throw new IgniteException("Exception while stopping MqttStreamer", t);
+        }
+    }
+
+    @Override public void connectionLost(Throwable throwable) {
+        log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
+        // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
+        try {
+            connectAndSubscribe();
+        }
+        catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
+        if (getMultipleTupleExtractor() != null) {
+            Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+            getStreamer().addData(entries);
+        } else {
+            Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+            getStreamer().addData(entry);
+        }
+    }
+
+    @Override public void deliveryComplete(IMqttDeliveryToken token) {
+        // ignore, we don't send messages
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
new file mode 100644
index 0000000..59730fa
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test for {@link MqttStreamer}.
+ *
+ * @author Raul Kripalani
+ */
+public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+
+    /** Constructor. */
+    public IgniteMqttStreamerTest() {
+        super(true);
+    }
+
+    @Before @SuppressWarnings("unchecked")
+    public void beforeTest() throws Exception {
+        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+    }
+
+    @After
+    public void afterTest() throws Exception {
+        grid().cache(null).clear();
+
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
new file mode 100644
index 0000000..ff25145
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * MQTT streamer tests.
+ *
+ * @author Raul Kripalani
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    IgniteMqttStreamerTest.class
+})
+public class IgniteMqttStreamerTestSuite {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
new file mode 100644
index 0000000..e2ed0f0
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+/**
+ * Test transformers for MqttStreamer tests.
+ *
+ * @author Raul Kripalani
+ */
+public class TestTupleExtractors {
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b47958f..c19a9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
         <module>modules/kafka</module>
         <module>modules/yarn</module>
         <module>modules/jms11</module>
+        <module>modules/mqtt</module>
         <module>modules/zookeeper</module>
         <module>modules/platform</module>
     </modules>


[2/5] ignite git commit: IGNITE-535 (WIP) MQTT Streamer.

Posted by ra...@apache.org.
IGNITE-535 (WIP) MQTT Streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53683e20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53683e20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53683e20

Branch: refs/heads/master
Commit: 53683e20d304dfd96d544f286e8460d3829598d8
Parents: 6b53f1b
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:23:28 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:23:28 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            |  39 +-
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 336 +++++++++++---
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 435 +++++++++++++++++++
 .../ignite/stream/mqtt/TestTupleExtractors.java |  28 --
 4 files changed, 741 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index b108180..4b0b46c 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,8 @@
 
     <properties>
         <paho.version>1.0.2</paho.version>
-        <mosquette.version>0.7</mosquette.version>
+        <activemq.version>5.11.1</activemq.version>
+        <guava-retryier.version>2.0.0</guava-retryier.version>
     </properties>
 
     <dependencies>
@@ -54,9 +55,29 @@
         </dependency>
 
         <dependency>
-            <groupId>org.eclipse.moquette</groupId>
-            <artifactId>moquette-broker</artifactId>
-            <version>${mosquette.version}</version>
+            <groupId>com.github.rholder</groupId>
+            <artifactId>guava-retrying</artifactId>
+            <version>${guava-retryier.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-mqtt</artifactId>
+            <version>${activemq.version}</version>
             <scope>test</scope>
         </dependency>
 
@@ -86,16 +107,6 @@
     <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
     <repositories>
         <repository>
-            <id>bintray</id>
-            <url>http://dl.bintray.com/andsel/maven/</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-        <repository>
             <id>Eclipse Paho Repo</id>
             <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
             <releases>

http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index 00a89ab..b86d385 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -20,6 +20,10 @@ package org.apache.ignite.stream.mqtt;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -29,12 +33,19 @@ import org.apache.ignite.stream.StreamAdapter;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamSingleTupleExtractor;
 
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.StopStrategy;
+import com.github.rholder.retry.WaitStrategies;
+import com.github.rholder.retry.WaitStrategy;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 /**
@@ -90,8 +101,20 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
     private Integer disconnectForciblyTimeout;
 
+    private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+
+    private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+
+    private MqttConnectionRetrier connectionRetrier;
+
     private volatile boolean stopped = true;
 
+    private volatile boolean connected;
+
+    private String cachedLogPrefix;
+
+    private boolean blockUntilConnected;
+
     /**
      * Starts streamer.
      *
@@ -109,18 +132,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             // parameter validations
             A.notNull(getStreamer(), "streamer");
             A.notNull(getIgnite(), "ignite");
-            A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+            A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing");
             A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
                 "both single and multiple tuple extractor");
             A.notNullOrEmpty(brokerUrl, "broker URL");
             A.notNullOrEmpty(clientId, "client ID");
 
-            // if we have both a single topic and a list of topics, fail
-            if (topic != null && topic.length() > 0 && !topics.isEmpty())
+            // if we have both a single topic and a list of topics (but the list of topic is not of
+            // size 1 and == topic, as this would be a case of re-initialization), fail
+            if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
+                topics.size() != 1 && !topics.get(0).equals(topic))
                 throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
 
-            // if we have both a single QoS and list, fail
-            if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+            // same as above but for QoS
+            if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
+                !qualitiesOfService.get(0).equals(qualityOfService)) {
                 throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
             }
 
@@ -131,12 +157,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             }
 
             // if we have multiple topics
-            if (topics != null && !topics.isEmpty()) {
-                for (String t : topics) {
+            if (!topics.isEmpty()) {
+                for (String t : topics)
                     A.notNullOrEmpty(t, "topic in list of topics");
-                }
+
                 A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
                     "service must be either empty or have the same size as topics list");
+
+                cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]";
+            }
+            else {  // just the single topic
+                topics.add(topic);
+
+                if (qualityOfService != null)
+                    qualitiesOfService.add(qualityOfService);
+
+                cachedLogPrefix = "[" + topic + "]";
             }
 
             // create logger
@@ -148,10 +184,28 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             else
                 client = new MqttClient(brokerUrl, clientId, persistence);
 
-            connectAndSubscribe();
+            // set this as a callback
+            client.setCallback(this);
 
+            // set stopped to false, as the connection will start async
             stopped = false;
 
+            // build retrier
+            Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
+                .retryIfResult(new Predicate<Boolean>() {
+                    @Override public boolean apply(Boolean connected) {
+                        return !connected;
+                    }
+                })
+                .retryIfException().retryIfRuntimeException()
+                .withWaitStrategy(retryWaitStrategy)
+                .withStopStrategy(retryStopStrategy)
+                .build();
+
+            // create the connection retrier
+            connectionRetrier = new MqttConnectionRetrier(retrier);
+            connectionRetrier.connect();
+
         }
         catch (Throwable t) {
             throw new IgniteException("Exception while initializing MqttStreamer", t);
@@ -159,34 +213,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
     }
 
-    private void connectAndSubscribe() throws MqttException {
-        // connect
-        if (connectOptions != null)
-            client.connect();
-        else
-            client.connect(connectOptions);
-
-        // subscribe to multiple topics
-        if (!topics.isEmpty()) {
-            if (qualitiesOfService.isEmpty()) {
-                client.subscribe(topics.toArray(new String[0]));
-            } else {
-                int[] qoses = new int[qualitiesOfService.size()];
-                for (int i = 0; i < qualitiesOfService.size(); i++)
-                    qoses[i] = qualitiesOfService.get(i);
-
-                client.subscribe(topics.toArray(new String[0]), qoses);
-            }
-        } else {
-            // subscribe to a single topic
-            if (qualityOfService == null) {
-                client.subscribe(topic);
-            } else {
-                client.subscribe(topic, qualityOfService);
-            }
-        }
-    }
-
     /**
      * Stops streamer.
      */
@@ -194,50 +220,250 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         if (stopped)
             throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
 
+        // stop the retrier
+        connectionRetrier.stop();
+
         try {
             if (disconnectForcibly) {
-                if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+                if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null)
                     client.disconnectForcibly();
-                } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+
+                else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null)
                     client.disconnectForcibly(disconnectForciblyTimeout);
-                } else {
+
+                else
                     client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
-                }
+
             } else {
-                if (disconnectQuiesceTimeout == null) {
+                if (disconnectQuiesceTimeout == null)
                     client.disconnect();
-                } else {
+
+                else
                     client.disconnect(disconnectQuiesceTimeout);
-                }
+
             }
+
+            client.close();
+            connected = false;
+            stopped = true;
+
         }
         catch (Throwable t) {
             throw new IgniteException("Exception while stopping MqttStreamer", t);
         }
     }
 
+    // -------------------------------
+    //  MQTT Client callback methods
+    // -------------------------------
+
     @Override public void connectionLost(Throwable throwable) {
-        log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
-        // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
-        try {
-            connectAndSubscribe();
-        }
-        catch (MqttException e) {
-            e.printStackTrace();
-        }
+        connected = false;
+
+        // if we have been stopped, we do not try to establish the connection again
+        if (stopped)
+            return;
+
+        log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable);
+        connectionRetrier.connect();
     }
 
     @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
         if (getMultipleTupleExtractor() != null) {
             Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+            if (log.isTraceEnabled()) {
+                log.trace("Adding cache entries: " + entries);
+            }
             getStreamer().addData(entries);
-        } else {
+        }
+        else {
             Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+            if (log.isTraceEnabled()) {
+                log.trace("Adding cache entry: " + entry);
+            }
             getStreamer().addData(entry);
         }
     }
 
     @Override public void deliveryComplete(IMqttDeliveryToken token) {
-        // ignore, we don't send messages
+        // ignore, as we don't send messages
+    }
+
+    // -------------------------------
+    //  Getters and setters
+    // -------------------------------
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public Integer getQualityOfService() {
+        return qualityOfService;
+    }
+
+    public void setQualityOfService(Integer qualityOfService) {
+        this.qualityOfService = qualityOfService;
+    }
+
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    public List<Integer> getQualitiesOfService() {
+        return qualitiesOfService;
+    }
+
+    public void setQualitiesOfService(List<Integer> qualitiesOfService) {
+        this.qualitiesOfService = qualitiesOfService;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public MqttClientPersistence getPersistence() {
+        return persistence;
+    }
+
+    public void setPersistence(MqttClientPersistence persistence) {
+        this.persistence = persistence;
+    }
+
+    public MqttConnectOptions getConnectOptions() {
+        return connectOptions;
+    }
+
+    public void setConnectOptions(MqttConnectOptions connectOptions) {
+        this.connectOptions = connectOptions;
+    }
+
+    public boolean isDisconnectForcibly() {
+        return disconnectForcibly;
+    }
+
+    public void setDisconnectForcibly(boolean disconnectForcibly) {
+        this.disconnectForcibly = disconnectForcibly;
+    }
+
+    public Integer getDisconnectQuiesceTimeout() {
+        return disconnectQuiesceTimeout;
+    }
+
+    public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
+        this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
+    }
+
+    public Integer getDisconnectForciblyTimeout() {
+        return disconnectForciblyTimeout;
+    }
+
+    public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
+        this.disconnectForciblyTimeout = disconnectForciblyTimeout;
     }
+
+    public WaitStrategy getRetryWaitStrategy() {
+        return retryWaitStrategy;
+    }
+
+    public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
+        this.retryWaitStrategy = retryWaitStrategy;
+    }
+
+    public StopStrategy getRetryStopStrategy() {
+        return retryStopStrategy;
+    }
+
+    public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
+        this.retryStopStrategy = retryStopStrategy;
+    }
+
+    public boolean isBlockUntilConnected() {
+        return blockUntilConnected;
+    }
+
+    public void setBlockUntilConnected(boolean blockUntilConnected) {
+        this.blockUntilConnected = blockUntilConnected;
+    }
+
+    private class MqttConnectionRetrier {
+
+        private final Retryer<Boolean> retrier;
+        private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        public MqttConnectionRetrier(Retryer<Boolean> retrier) {
+            this.retrier = retrier;
+        }
+
+        public void connect() {
+            Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    // if we're already connected, return immediately
+                    if (connected)
+                        return true;
+
+                    if (stopped)
+                        return false;
+
+                    // connect to broker
+                    if (connectOptions == null)
+                        client.connect();
+                    else
+                        client.connect(connectOptions);
+
+                    // always use the multiple topics variant of the mqtt client; even if the user specified a single
+                    // topic and/or QoS, the initialization code would have placed it inside the 1..n structures
+                    if (qualitiesOfService.isEmpty())
+                        client.subscribe(topics.toArray(new String[0]));
+
+                    else {
+                        int[] qoses = new int[qualitiesOfService.size()];
+                        for (int i = 0; i < qualitiesOfService.size(); i++)
+                            qoses[i] = qualitiesOfService.get(i);
+
+                        client.subscribe(topics.toArray(new String[0]), qoses);
+                    }
+
+                    connected = true;
+                    return connected;
+                }
+            });
+
+            Future<Boolean> result = executor.submit(callable);
+
+            if (blockUntilConnected) {
+                try {
+                    result.get();
+                }
+                catch (Throwable e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public void stop() {
+            executor.shutdownNow();
+        }
+
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 59730fa..012486a 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -17,11 +17,47 @@
 
 package org.apache.ignite.stream.mqtt;
 
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Splitter;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.After;
 import org.junit.Before;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
 /**
  * Test for {@link MqttStreamer}.
  *
@@ -29,6 +65,24 @@ import org.junit.Before;
  */
 public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
+    private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+    private static final String SINGLE_TOPIC_NAME = "abc";
+    private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+
+    private BrokerService broker;
+    private MqttClient client;
+    private String brokerUrl;
+    private int port;
+    private MqttStreamer<Integer, String> streamer;
+    private UUID remoteListener;
+
+    static {
+        for (int i = 0; i < 100; i++)
+            TEST_DATA.put(i, "v" + i);
+    }
+
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
     /** Constructor. */
     public IgniteMqttStreamerTest() {
         super(true);
@@ -38,13 +92,394 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     public void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
 
+        // find an available local port
+        try (ServerSocket ss = new ServerSocket(0)) {
+            port = ss.getLocalPort();
+        }
+
+        // create the broker
+        broker = new BrokerService();
+        broker.deleteAllMessages();
+        broker.setPersistent(false);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setQueuePrefetch(1);
+        broker.setDestinationPolicy(policyMap);
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        // add the MQTT transport connector to the broker
+        broker.addConnector("mqtt://localhost:" + port);
+        broker.setStartAsync(false);
+        broker.start(true);
+
+        // create the broker URL
+        brokerUrl = "tcp://localhost:" + port;
+
+        // create the client and connect
+        client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+        client.connect();
+
+        // create mqtt streamer
+        dataStreamer = grid().dataStreamer(null);
+        streamer = createMqttStreamer(dataStreamer);
     }
 
     @After
     public void afterTest() throws Exception {
+        try {
+            streamer.stop();
+        }
+        catch (Exception e) {
+            // ignore if already stopped
+        }
+
+        dataStreamer.close();
+
         grid().cache(null).clear();
 
+        broker.stop();
+        broker.deleteAllMessages();
+
+    }
+
+    public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+    }
+
+    public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
+        // configure streamer
+        streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, true);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
+        // configure streamer
+        streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, true);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+    }
+
+    public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        MqttConnectOptions connOptions = new MqttConnectOptions();
+        connOptions.setCleanSession(false);
+        streamer.setConnectOptions(connOptions);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        // explicitly stop the streamer
+        streamer.stop();
+
+        // send messages while stopped
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+        latch = subscribeToPutEvents(50);
+
+        // start the streamer again
+        streamer.start();
+
+        // assertions - make sure that messages sent during disconnection were also received
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(100);
+    }
+
+    public void testSingleTopic_NoQoS_Reconnect() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+        streamer.setRetryStopStrategy(StopStrategies.neverStop());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        // now shutdown the broker, wait 2 seconds and start it again
+        broker.stop();
+        broker.start(true);
+        broker.waitUntilStarted();
+        Thread.sleep(2000);
+        client.connect();
+
+        // let's ensure we have 2 connections: Ignite and our test
+        assertEquals(2, broker.getTransportConnectorByScheme("mqtt").getConnections().size());
+
+        // subscribe to cache PUT events again
+        latch = subscribeToPutEvents(50);
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(100);
+    }
+
+    public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+        streamer.setRetryStopStrategy(StopStrategies.stopAfterAttempt(1));
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        // now shutdown the broker, wait 2 seconds and start it again
+        broker.stop();
+        broker.start(true);
+        broker.waitUntilStarted();
+        client.connect();
+
+        // lets send messages and ensure they are not received, because our retrier desisted
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+        Thread.sleep(3000);
+        assertNull(grid().cache(null).get(50));
+
+    }
+
+    public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+        streamer.setQualitiesOfService(Arrays.asList(1, 1, 1, 1));
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+    }
+
+    public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+        streamer.setQualitiesOfService(Arrays.asList(1, 1, 1));
+
+        try {
+            streamer.start();
+        }
+        catch (Exception e) {
+            return;
+        }
+        fail("Expected an exception reporting invalid parameters");
+
+    }
+
+    private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
+        MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
+        streamer.setIgnite(grid());
+        streamer.setStreamer(dataStreamer);
+        streamer.setBrokerUrl(brokerUrl);
+        streamer.setClientId(UUID.randomUUID().toString());
+        streamer.setBlockUntilConnected(true);
+
+        dataStreamer.allowOverwrite(true);
+        dataStreamer.autoFlushFrequency(1);
+
+        return streamer;
+    }
+
+    public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+        if (singleMessage) {
+            final List<StringBuilder> sbs = new ArrayList<>(topics.size());
+            // initialize String Builders for each topic
+            F.forEach(topics, new IgniteInClosure<String>() {
+                @Override public void apply(String s) {
+                    sbs.add(new StringBuilder());
+                }
+            });
+            // fill String Builders for each topic
+            F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer integer) {
+                    sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
+                }
+            });
+            // send each buffer out
+            for (int i = 0; i < topics.size(); i++) {
+                MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes());
+                client.publish(topics.get(i % topics.size()), msg);
+            }
+        }
+        else {
+            for (int i = fromIdx; i < fromIdx + count; i++) {
+                byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
+                MqttMessage msg = new MqttMessage(payload);
+                client.publish(topics.get(i % topics.size()), msg);
+            }
+        }
+    }
+
+    private CountDownLatch subscribeToPutEvents(int expect) {
+        Ignite ignite = grid();
+
+        // Listen to cache PUT events and expect as many as messages as test data items
+        final CountDownLatch latch = new CountDownLatch(expect);
+        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+            @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                latch.countDown();
+                return true;
+            }
+        };
+
+        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+        return latch;
+    }
+
+    private void assertCacheEntriesLoaded(int count) {
+        // get the cache and check that the entries are present
+        IgniteCache<Integer, String> cache = grid().cache(null);
+
+        // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+            assertEquals(TEST_DATA.get(key), cache.get(key));
+        }
+
+        // assert that the cache exactly the specified amount of elements
+        assertEquals(count, cache.size(CachePeekMode.ALL));
+
+        // remove the event listener
+        grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+    }
+
+    public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
+        return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
+            @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
+                List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
+                return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+            }
+        };
+    }
 
+    public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
+        return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
+            @Override public Map<Integer, String> extract(MqttMessage msg) {
+                final Map<String, String> map = Splitter.on("\n")
+                    .omitEmptyStrings()
+                    .withKeyValueSeparator(",")
+                    .split(new String(msg.getPayload()));
+                final Map<Integer, String> answer = new HashMap<>();
+                F.forEach(map.keySet(), new IgniteInClosure<String>() {
+                    @Override public void apply(String s) {
+                        answer.put(Integer.parseInt(s), map.get(s));
+                    }
+                });
+                return answer;
+            }
+        };
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
deleted file mode 100644
index e2ed0f0..0000000
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.mqtt;
-
-/**
- * Test transformers for MqttStreamer tests.
- *
- * @author Raul Kripalani
- */
-public class TestTupleExtractors {
-
-
-}
\ No newline at end of file


[4/5] ignite git commit: IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.

Posted by ra...@apache.org.
IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/296dd6e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/296dd6e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/296dd6e7

Branch: refs/heads/master
Commit: 296dd6e7d86fe6d0914a9fbf8062632c04e4d22c
Parents: f03f3a3
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:24:44 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:24:44 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            |   9 +-
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 156 ++++++++++++++++++-
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  80 +++++++++-
 3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index 4b0b46c..21511e8 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,7 @@
 
     <properties>
         <paho.version>1.0.2</paho.version>
-        <activemq.version>5.11.1</activemq.version>
+        <activemq.version>5.12.0</activemq.version>
         <guava-retryier.version>2.0.0</guava-retryier.version>
     </properties>
 
@@ -69,13 +69,6 @@
 
         <dependency>
             <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-kahadb-store</artifactId>
-            <version>${activemq.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-mqtt</artifactId>
             <version>${activemq.version}</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index b86d385..f18ae42 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -62,12 +62,17 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
  *     <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
  *     <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
  *         sessions, etc.</li>
- *     <li>Specifying the client ID.</li>
+ *     <li>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user
+ *         does not provide one.</li>
+ *     <li>(Re-)Connection retries based on the <i>guava-retrying</i> library. Retry wait and retry stop policies
+ *         can be configured.</li>
+ *     <li>Blocking the start() method until connected for the first time.</li>
  * </ul>
  *
- * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * Note: features like durable subscriptions, last will testament, etc. can be configured via the
  * {@link #connectOptions} property.
  *
+ * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
  * @author Raul Kripalani
  */
 public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -75,46 +80,65 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /** Logger. */
     private IgniteLogger log;
 
+    /** The MQTT client object for internal use. */
     private MqttClient client;
 
+    /** The broker URL, set by the user. */
     private String brokerUrl;
 
+    /** The topic to subscribe to, if a single topic. */
     private String topic;
 
+    /** The quality of service to use for a single topic subscription (optional). */
     private Integer qualityOfService;
 
+    /** The topics to subscribe to, if many. */
     private List<String> topics;
 
+    /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+     *  number of elements as {@link #topics}. */
     private List<Integer> qualitiesOfService;
 
-    /** Client ID in case we're using durable subscribers. */
+    /** The MQTT client ID (optional). */
     private String clientId;
 
+    /** A configurable persistence mechanism. If not set, Paho will use its default. */
     private MqttClientPersistence persistence;
 
+    /** The MQTT client connect options, where users can configured the last will and testament, durability, etc. */
     private MqttConnectOptions connectOptions;
 
-    // disconnect parameters
+    /** Quiesce timeout on disconnection. */
     private Integer disconnectQuiesceTimeout;
 
+    /** Whether to disconnect forcibly or not. */
     private boolean disconnectForcibly;
 
+    /** If disconnecting forcibly, the timeout. */
     private Integer disconnectForciblyTimeout;
 
+    /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+     *  Fibonacci-based strategy. */
     private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
 
+    /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
     private StopStrategy retryStopStrategy = StopStrategies.neverStop();
 
+    /** The internal connection retrier object with a thread pool of size 1. */
     private MqttConnectionRetrier connectionRetrier;
 
+    /** Whether to block the start() method until connected for the first time. */
+    private boolean blockUntilConnected;
+
+    /** State keeping. */
     private volatile boolean stopped = true;
 
+    /** State keeping. */
     private volatile boolean connected;
 
+    /** Cached log prefix for cache messages. */
     private String cachedLogPrefix;
 
-    private boolean blockUntilConnected;
-
     /**
      * Starts streamer.
      *
@@ -136,7 +160,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
                 "both single and multiple tuple extractor");
             A.notNullOrEmpty(brokerUrl, "broker URL");
-            A.notNullOrEmpty(clientId, "client ID");
+
+            // if the client ID is empty, generate one
+            if (clientId == null || clientId.length() == 0) {
+                clientId = MqttClient.generateClientId();
+            }
 
             // if we have both a single topic and a list of topics (but the list of topic is not of
             // size 1 and == topic, as this would be a case of re-initialization), fail
@@ -257,6 +285,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     //  MQTT Client callback methods
     // -------------------------------
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void connectionLost(Throwable throwable) {
         connected = false;
 
@@ -268,6 +299,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         connectionRetrier.connect();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
         if (getMultipleTupleExtractor() != null) {
             Map<K, V> entries = getMultipleTupleExtractor().extract(message);
@@ -285,6 +319,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void deliveryComplete(IMqttDeliveryToken token) {
         // ignore, as we don't send messages
     }
@@ -293,127 +330,229 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     //  Getters and setters
     // -------------------------------
 
+    /**
+     * @return
+     */
     public String getBrokerUrl() {
         return brokerUrl;
     }
 
+    /**
+     * @param brokerUrl The Broker URL (compulsory).
+     */
     public void setBrokerUrl(String brokerUrl) {
         this.brokerUrl = brokerUrl;
     }
 
+    /**
+     * @return
+     */
     public String getTopic() {
         return topic;
     }
 
+    /**
+     * @param topic The topic to subscribe to, if a single topic.
+     */
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    /**
+     * @return
+     */
     public Integer getQualityOfService() {
         return qualityOfService;
     }
 
+    /**
+     * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+     */
     public void setQualityOfService(Integer qualityOfService) {
         this.qualityOfService = qualityOfService;
     }
 
+    /**
+     * @return
+     */
     public List<String> getTopics() {
         return topics;
     }
 
+    /**
+     * @param topics The topics to subscribe to, if many.
+     */
     public void setTopics(List<String> topics) {
         this.topics = topics;
     }
 
+    /**
+     * @return
+     */
     public List<Integer> getQualitiesOfService() {
         return qualitiesOfService;
     }
 
+    /**
+     * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
+     * If specified, the list must contain the same number of elements as {@link #topics}.
+     */
     public void setQualitiesOfService(List<Integer> qualitiesOfService) {
         this.qualitiesOfService = qualitiesOfService;
     }
 
+    /**
+     * @return
+     */
     public String getClientId() {
         return clientId;
     }
 
+    /**
+     * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+     * it througout any reconnection attempts.
+     */
     public void setClientId(String clientId) {
         this.clientId = clientId;
     }
 
+    /**
+     * @return
+     */
     public MqttClientPersistence getPersistence() {
         return persistence;
     }
 
+    /**
+     * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+     */
     public void setPersistence(MqttClientPersistence persistence) {
         this.persistence = persistence;
     }
 
+    /**
+     * @return
+     */
     public MqttConnectOptions getConnectOptions() {
         return connectOptions;
     }
 
+    /**
+     * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+     */
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
 
+    /**
+     * @return
+     */
     public boolean isDisconnectForcibly() {
         return disconnectForcibly;
     }
 
+    /**
+     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+     */
     public void setDisconnectForcibly(boolean disconnectForcibly) {
         this.disconnectForcibly = disconnectForcibly;
     }
 
+    /**
+     * @return
+     */
     public Integer getDisconnectQuiesceTimeout() {
         return disconnectQuiesceTimeout;
     }
 
+    /**
+     * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+     */
     public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
         this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
     }
 
+    /**
+     * @return
+     */
     public Integer getDisconnectForciblyTimeout() {
         return disconnectForciblyTimeout;
     }
 
+    /**
+     * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+     */
     public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
         this.disconnectForciblyTimeout = disconnectForciblyTimeout;
     }
 
+    /**
+     * @return
+     */
     public WaitStrategy getRetryWaitStrategy() {
         return retryWaitStrategy;
     }
 
+    /**
+     * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
+     * By default, this streamer uses a Fibonacci-based strategy.
+     */
     public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
         this.retryWaitStrategy = retryWaitStrategy;
     }
 
+    /**
+     * @return
+     */
     public StopStrategy getRetryStopStrategy() {
         return retryStopStrategy;
     }
 
+    /**
+     * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+     */
     public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
         this.retryStopStrategy = retryStopStrategy;
     }
 
+    /**
+     * @return
+     */
     public boolean isBlockUntilConnected() {
         return blockUntilConnected;
     }
 
+    /**
+     * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
+     * false.
+     */
     public void setBlockUntilConnected(boolean blockUntilConnected) {
         this.blockUntilConnected = blockUntilConnected;
     }
 
+    /**
+     * A utility class to help us with (re-)connecting to the MQTT broker. It uses a single-threaded executor to perform
+     * the (re-)connections.
+     */
     private class MqttConnectionRetrier {
 
+        /** The guava-retrying retrier object. */
         private final Retryer<Boolean> retrier;
+
+        /** Single-threaded pool. */
         private ExecutorService executor = Executors.newSingleThreadExecutor();
 
+        /**
+         * Constructor.
+         * @param retrier The retryier object.
+         */
         public MqttConnectionRetrier(Retryer<Boolean> retrier) {
             this.retrier = retrier;
         }
 
+        /**
+         * Method that is called by the streamer to ask us to (re-)connect.
+         */
         public void connect() {
             Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
@@ -460,6 +599,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             }
         }
 
+        /**
+         * Stops this connection utility class by shutting down the thread pool.
+         */
         public void stop() {
             executor.shutdownNow();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 012486a..5ac7339 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -65,24 +65,41 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
  */
 public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
+    /** The test data. */
     private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+    /** Topic name for single topic tests. */
     private static final String SINGLE_TOPIC_NAME = "abc";
+
+    /** Topic names for multiple topic tests. */
     private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
 
+    /** The AMQ broker with an MQTT interface. */
     private BrokerService broker;
+
+    /** The MQTT client. */
     private MqttClient client;
+
+    /** The broker URL. */
     private String brokerUrl;
+
+    /** The broker port. **/
     private int port;
+
+    /** The MQTT streamer currently under test. */
     private MqttStreamer<Integer, String> streamer;
+
+    /** The UUID of the currently active remote listener. */
     private UUID remoteListener;
 
+    /** The Ignite data streamer. */
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
     static {
         for (int i = 0; i < 100; i++)
             TEST_DATA.put(i, "v" + i);
     }
 
-    private IgniteDataStreamer<Integer, String> dataStreamer;
-
     /** Constructor. */
     public IgniteMqttStreamerTest() {
         super(true);
@@ -99,14 +116,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
         // create the broker
         broker = new BrokerService();
-        broker.deleteAllMessages();
+        broker.setDeleteAllMessagesOnStartup(true);
         broker.setPersistent(false);
+        broker.setPersistenceAdapter(null);
+        broker.setPersistenceFactory(null);
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
         policy.setQueuePrefetch(1);
         broker.setDestinationPolicy(policyMap);
         broker.getDestinationPolicy().setDefaultEntry(policy);
+        broker.setSchedulerSupport(false);
 
         // add the MQTT transport connector to the broker
         broker.addConnector("mqtt://localhost:" + port);
@@ -143,6 +163,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -162,6 +185,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(50);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -185,6 +211,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
         streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -204,6 +233,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(50);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
         streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -227,6 +259,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -265,6 +300,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(100);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_Reconnect() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -306,6 +344,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(100);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -339,6 +380,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -363,6 +407,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -379,6 +426,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
         MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
         streamer.setIgnite(grid());
@@ -393,7 +443,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return streamer;
     }
 
-    public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+    /**
+     * @throws Exception
+     */
+    private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
         if (singleMessage) {
             final List<StringBuilder> sbs = new ArrayList<>(topics.size());
             // initialize String Builders for each topic
@@ -423,6 +476,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception
+     */
     private CountDownLatch subscribeToPutEvents(int expect) {
         Ignite ignite = grid();
 
@@ -439,14 +495,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return latch;
     }
 
+    /**
+     * @throws Exception
+     */
     private void assertCacheEntriesLoaded(int count) {
         // get the cache and check that the entries are present
         IgniteCache<Integer, String> cache = grid().cache(null);
 
         // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
-        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
             assertEquals(TEST_DATA.get(key), cache.get(key));
-        }
 
         // assert that the cache exactly the specified amount of elements
         assertEquals(count, cache.size(CachePeekMode.ALL));
@@ -455,6 +513,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
     }
 
+    /**
+     * Returns a {@link StreamSingleTupleExtractor} for testing.
+     *
+     * @throws Exception
+     */
     public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
         return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
@@ -464,6 +527,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         };
     }
 
+    /**
+     * Returns a {@link StreamMultipleTupleExtractor} for testing.
+     *
+     * @throws Exception
+     */
     public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
         return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map<Integer, String> extract(MqttMessage msg) {


[5/5] ignite git commit: IGNITE-535 Merge MQTT Streamer into master.

Posted by ra...@apache.org.
IGNITE-535 Merge MQTT Streamer into master.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88acd318
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88acd318
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88acd318

Branch: refs/heads/master
Commit: 88acd318b84ce3bff8c061bb34718e0e5f7127fb
Parents: 421a523 296dd6e
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:26:04 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:26:04 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            | 114 ++++
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 553 +++++++++++++++++
 .../mqtt/IgniteMqttStreamerTestSuite.java       |  34 ++
 pom.xml                                         |   1 +
 5 files changed, 1313 insertions(+)
----------------------------------------------------------------------



[3/5] ignite git commit: Merge branch 'master' into feature/ignite-535-mqtt

Posted by ra...@apache.org.
Merge branch 'master' into feature/ignite-535-mqtt


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f03f3a3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f03f3a3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f03f3a3b

Branch: refs/heads/master
Commit: f03f3a3b48fa105f318e9493440671188770f4ef
Parents: 53683e2 421a523
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:36:53 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:36:53 2015 +0100

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteAtomicLong.java     |  15 +-
 .../apache/ignite/IgniteAtomicReference.java    |   9 +-
 .../org/apache/ignite/IgniteAtomicSequence.java |   9 +-
 .../org/apache/ignite/IgniteAtomicStamped.java  |  13 +-
 .../configuration/NearCacheConfiguration.java   |  18 +-
 .../apache/ignite/internal/IgniteKernal.java    |   7 -
 .../processors/cache/GridCacheContext.java      |   6 +-
 .../cache/GridCacheEvictionManager.java         |   6 +-
 .../cache/GridCacheEvictionResponse.java        |   2 +-
 .../processors/cache/GridCacheIoManager.java    |  47 ++--
 .../processors/cache/GridCacheMessage.java      |   7 +
 .../processors/cache/GridCacheMvccManager.java  |  34 ++-
 .../GridCachePartitionExchangeManager.java      |  41 +++-
 .../processors/cache/GridCacheProcessor.java    |  28 ++-
 .../GridDistributedLockResponse.java            |   6 +-
 .../GridDistributedTxPrepareResponse.java       |   6 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  26 +-
 .../distributed/dht/GridDhtTopologyFuture.java  |   6 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   2 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  12 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  16 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |   2 +
 .../atomic/GridNearAtomicUpdateResponse.java    |  11 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  44 +++-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   2 +-
 .../dht/preloader/GridDhtForceKeysResponse.java |   6 +-
 .../GridDhtPartitionsExchangeFuture.java        |  19 +-
 .../distributed/near/GridNearGetResponse.java   |   6 +-
 .../distributed/near/GridNearLockFuture.java    |  26 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  20 +-
 .../near/GridNearTxFinishResponse.java          |   6 +-
 .../query/GridCacheDistributedQueryFuture.java  |  27 +-
 .../cache/query/GridCacheLocalQueryFuture.java  |   5 +
 .../cache/query/GridCacheQueryAdapter.java      | 170 ++++++++-----
 .../query/GridCacheQueryFutureAdapter.java      |  11 +-
 .../cache/query/GridCacheQueryManager.java      |  30 ++-
 .../cache/query/GridCacheQueryResponse.java     |   6 +-
 .../continuous/CacheContinuousQueryHandler.java |  12 +-
 .../transactions/IgniteTxLocalAdapter.java      |   4 +-
 .../ignite/internal/util/GridSpinBusyLock.java  |  10 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  12 +-
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |  24 +-
 .../distributed/CacheAffEarlySelfTest.java      | 245 +++++++++++++++++++
 .../GridCacheSwapScanQueryAbstractSelfTest.java | 118 ++++-----
 .../processors/igfs/IgfsAbstractSelfTest.java   |   5 +-
 .../loadtests/hashmap/GridCacheTestContext.java |   4 +-
 .../ignite/testframework/GridTestUtils.java     |  14 +-
 .../cache/CacheIndexStreamerTest.java           | 137 +++++++++++
 ...CacheScanPartitionQueryFallbackSelfTest.java | 244 +++++-------------
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   2 -
 .../Apache.Ignite.Core/Impl/IgniteManager.cs    |   2 -
 .../ignite/visor/commands/VisorConsole.scala    |  37 ++-
 .../config/benchmark-put-indexed-val.properties |   2 +-
 modules/yardstick/config/ignite-base-config.xml |   2 +-
 .../yardstick/IgniteBenchmarkArguments.java     |  24 +-
 .../org/apache/ignite/yardstick/IgniteNode.java |  12 +-
 57 files changed, 1084 insertions(+), 535 deletions(-)
----------------------------------------------------------------------