You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/09/21 18:26:25 UTC
[1/5] ignite git commit: IGNITE-535 (WIP) Implement MQTT Streamer.
Repository: ignite
Updated Branches:
refs/heads/master 421a5234b -> 88acd318b
IGNITE-535 (WIP) Implement MQTT Streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b53f1bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b53f1bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b53f1bb
Branch: refs/heads/master
Commit: 6b53f1bb2699ae7a6ea8ae277de1bfc3ecbd7b6a
Parents: b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:45:58 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 22:30:09 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 110 +++++++++
.../apache/ignite/stream/mqtt/MqttStreamer.java | 243 +++++++++++++++++++
.../stream/mqtt/IgniteMqttStreamerTest.java | 50 ++++
.../mqtt/IgniteMqttStreamerTestSuite.java | 34 +++
.../ignite/stream/mqtt/TestTupleExtractors.java | 28 +++
pom.xml | 1 +
6 files changed, 466 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
new file mode 100644
index 0000000..b108180
--- /dev/null
+++ b/modules/mqtt/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-mqtt</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <properties>
+ <paho.version>1.0.2</paho.version>
+ <mosquette.version>0.7</mosquette.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>${paho.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.moquette</groupId>
+ <artifactId>moquette-broker</artifactId>
+ <version>${mosquette.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
+ <repositories>
+ <repository>
+ <id>bintray</id>
+ <url>http://dl.bintray.com/andsel/maven/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <id>Eclipse Paho Repo</id>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
new file mode 100644
index 0000000..00a89ab
--- /dev/null
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * Streamer that consumes from a MQTT topic and feeds key-value pairs into an {@link IgniteDataStreamer} instance,
+ * using Eclipse Paho as an MQTT client.
+ * <p>
+ * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract
+ * cache tuples out of the incoming message.
+ * <p>
+ * This Streamer has many features:
+ *
+ * <ul>
+ * <li>Subscribing to a single topic or multiple topics at once.</li>
+ * <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
+ * <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
+ * sessions, etc.</li>
+ * <li>Specifying the client ID.</li>
+ * </ul>
+ *
+ * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * {@link #connectOptions} property.
+ *
+ * @author Raul Kripalani
+ */
+public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ private MqttClient client;
+
+ private String brokerUrl;
+
+ private String topic;
+
+ private Integer qualityOfService;
+
+ private List<String> topics;
+
+ private List<Integer> qualitiesOfService;
+
+ /** Client ID in case we're using durable subscribers. */
+ private String clientId;
+
+ private MqttClientPersistence persistence;
+
+ private MqttConnectOptions connectOptions;
+
+ // disconnect parameters
+ private Integer disconnectQuiesceTimeout;
+
+ private boolean disconnectForcibly;
+
+ private Integer disconnectForciblyTimeout;
+
+ private volatile boolean stopped = true;
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() throws IgniteException {
+ if (!stopped)
+ throw new IgniteException("Attempted to start an already started MQTT Streamer");
+
+ // for simplicity, if these are null initialize to empty lists
+ topics = topics == null ? new ArrayList<String>() : topics;
+ qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
+
+ try {
+ // parameter validations
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+ A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
+ "both single and multiple tuple extractor");
+ A.notNullOrEmpty(brokerUrl, "broker URL");
+ A.notNullOrEmpty(clientId, "client ID");
+
+ // if we have both a single topic and a list of topics, fail
+ if (topic != null && topic.length() > 0 && !topics.isEmpty())
+ throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
+
+ // if we have both a single QoS and list, fail
+ if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+ throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
+ }
+
+ // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
+ if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+ A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
+ "with quiesce");
+ }
+
+ // if we have multiple topics
+ if (topics != null && !topics.isEmpty()) {
+ for (String t : topics) {
+ A.notNullOrEmpty(t, "topic in list of topics");
+ }
+ A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
+ "service must be either empty or have the same size as topics list");
+ }
+
+ // create logger
+ log = getIgnite().log();
+
+ // create the mqtt client
+ if (persistence == null)
+ client = new MqttClient(brokerUrl, clientId);
+ else
+ client = new MqttClient(brokerUrl, clientId, persistence);
+
+ connectAndSubscribe();
+
+ stopped = false;
+
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Exception while initializing MqttStreamer", t);
+ }
+
+ }
+
+ private void connectAndSubscribe() throws MqttException {
+ // connect
+ if (connectOptions != null)
+ client.connect();
+ else
+ client.connect(connectOptions);
+
+ // subscribe to multiple topics
+ if (!topics.isEmpty()) {
+ if (qualitiesOfService.isEmpty()) {
+ client.subscribe(topics.toArray(new String[0]));
+ } else {
+ int[] qoses = new int[qualitiesOfService.size()];
+ for (int i = 0; i < qualitiesOfService.size(); i++)
+ qoses[i] = qualitiesOfService.get(i);
+
+ client.subscribe(topics.toArray(new String[0]), qoses);
+ }
+ } else {
+ // subscribe to a single topic
+ if (qualityOfService == null) {
+ client.subscribe(topic);
+ } else {
+ client.subscribe(topic, qualityOfService);
+ }
+ }
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() throws IgniteException {
+ if (stopped)
+ throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+
+ try {
+ if (disconnectForcibly) {
+ if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+ client.disconnectForcibly();
+ } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+ client.disconnectForcibly(disconnectForciblyTimeout);
+ } else {
+ client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
+ }
+ } else {
+ if (disconnectQuiesceTimeout == null) {
+ client.disconnect();
+ } else {
+ client.disconnect(disconnectQuiesceTimeout);
+ }
+ }
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Exception while stopping MqttStreamer", t);
+ }
+ }
+
+ @Override public void connectionLost(Throwable throwable) {
+ log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
+ // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
+ try {
+ connectAndSubscribe();
+ }
+ catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
+ if (getMultipleTupleExtractor() != null) {
+ Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+ getStreamer().addData(entries);
+ } else {
+ Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+ getStreamer().addData(entry);
+ }
+ }
+
+ @Override public void deliveryComplete(IMqttDeliveryToken token) {
+ // ignore, we don't send messages
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
new file mode 100644
index 0000000..59730fa
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test for {@link MqttStreamer}.
+ *
+ * @author Raul Kripalani
+ */
+public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+
+ /** Constructor. */
+ public IgniteMqttStreamerTest() {
+ super(true);
+ }
+
+ @Before @SuppressWarnings("unchecked")
+ public void beforeTest() throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ grid().cache(null).clear();
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
new file mode 100644
index 0000000..ff25145
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * MQTT streamer tests.
+ *
+ * @author Raul Kripalani
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ IgniteMqttStreamerTest.class
+})
+public class IgniteMqttStreamerTestSuite {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
new file mode 100644
index 0000000..e2ed0f0
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+/**
+ * Test transformers for MqttStreamer tests.
+ *
+ * @author Raul Kripalani
+ */
+public class TestTupleExtractors {
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b47958f..c19a9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>modules/kafka</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
+ <module>modules/mqtt</module>
<module>modules/zookeeper</module>
<module>modules/platform</module>
</modules>
[2/5] ignite git commit: IGNITE-535 (WIP) MQTT Streamer.
Posted by ra...@apache.org.
IGNITE-535 (WIP) MQTT Streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53683e20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53683e20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53683e20
Branch: refs/heads/master
Commit: 53683e20d304dfd96d544f286e8460d3829598d8
Parents: 6b53f1b
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:23:28 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:23:28 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 39 +-
.../apache/ignite/stream/mqtt/MqttStreamer.java | 336 +++++++++++---
.../stream/mqtt/IgniteMqttStreamerTest.java | 435 +++++++++++++++++++
.../ignite/stream/mqtt/TestTupleExtractors.java | 28 --
4 files changed, 741 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index b108180..4b0b46c 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,8 @@
<properties>
<paho.version>1.0.2</paho.version>
- <mosquette.version>0.7</mosquette.version>
+ <activemq.version>5.11.1</activemq.version>
+ <guava-retryier.version>2.0.0</guava-retryier.version>
</properties>
<dependencies>
@@ -54,9 +55,29 @@
</dependency>
<dependency>
- <groupId>org.eclipse.moquette</groupId>
- <artifactId>moquette-broker</artifactId>
- <version>${mosquette.version}</version>
+ <groupId>com.github.rholder</groupId>
+ <artifactId>guava-retrying</artifactId>
+ <version>${guava-retryier.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>${activemq.version}</version>
<scope>test</scope>
</dependency>
@@ -86,16 +107,6 @@
<!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
<repositories>
<repository>
- <id>bintray</id>
- <url>http://dl.bintray.com/andsel/maven/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- <repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
<releases>
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index 00a89ab..b86d385 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -20,6 +20,10 @@ package org.apache.ignite.stream.mqtt;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
@@ -29,12 +33,19 @@ import org.apache.ignite.stream.StreamAdapter;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.StopStrategy;
+import com.github.rholder.retry.WaitStrategies;
+import com.github.rholder.retry.WaitStrategy;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
@@ -90,8 +101,20 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
private Integer disconnectForciblyTimeout;
+ private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+
+ private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+
+ private MqttConnectionRetrier connectionRetrier;
+
private volatile boolean stopped = true;
+ private volatile boolean connected;
+
+ private String cachedLogPrefix;
+
+ private boolean blockUntilConnected;
+
/**
* Starts streamer.
*
@@ -109,18 +132,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// parameter validations
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
- A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+ A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing");
A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
"both single and multiple tuple extractor");
A.notNullOrEmpty(brokerUrl, "broker URL");
A.notNullOrEmpty(clientId, "client ID");
- // if we have both a single topic and a list of topics, fail
- if (topic != null && topic.length() > 0 && !topics.isEmpty())
+ // if we have both a single topic and a list of topics (but the list of topic is not of
+ // size 1 and == topic, as this would be a case of re-initialization), fail
+ if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
+ topics.size() != 1 && !topics.get(0).equals(topic))
throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
- // if we have both a single QoS and list, fail
- if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+ // same as above but for QoS
+ if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
+ !qualitiesOfService.get(0).equals(qualityOfService)) {
throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
}
@@ -131,12 +157,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
// if we have multiple topics
- if (topics != null && !topics.isEmpty()) {
- for (String t : topics) {
+ if (!topics.isEmpty()) {
+ for (String t : topics)
A.notNullOrEmpty(t, "topic in list of topics");
- }
+
A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
"service must be either empty or have the same size as topics list");
+
+ cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]";
+ }
+ else { // just the single topic
+ topics.add(topic);
+
+ if (qualityOfService != null)
+ qualitiesOfService.add(qualityOfService);
+
+ cachedLogPrefix = "[" + topic + "]";
}
// create logger
@@ -148,10 +184,28 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
else
client = new MqttClient(brokerUrl, clientId, persistence);
- connectAndSubscribe();
+ // set this as a callback
+ client.setCallback(this);
+ // set stopped to false, as the connection will start async
stopped = false;
+ // build retrier
+ Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
+ .retryIfResult(new Predicate<Boolean>() {
+ @Override public boolean apply(Boolean connected) {
+ return !connected;
+ }
+ })
+ .retryIfException().retryIfRuntimeException()
+ .withWaitStrategy(retryWaitStrategy)
+ .withStopStrategy(retryStopStrategy)
+ .build();
+
+ // create the connection retrier
+ connectionRetrier = new MqttConnectionRetrier(retrier);
+ connectionRetrier.connect();
+
}
catch (Throwable t) {
throw new IgniteException("Exception while initializing MqttStreamer", t);
@@ -159,34 +213,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
- private void connectAndSubscribe() throws MqttException {
- // connect
- if (connectOptions != null)
- client.connect();
- else
- client.connect(connectOptions);
-
- // subscribe to multiple topics
- if (!topics.isEmpty()) {
- if (qualitiesOfService.isEmpty()) {
- client.subscribe(topics.toArray(new String[0]));
- } else {
- int[] qoses = new int[qualitiesOfService.size()];
- for (int i = 0; i < qualitiesOfService.size(); i++)
- qoses[i] = qualitiesOfService.get(i);
-
- client.subscribe(topics.toArray(new String[0]), qoses);
- }
- } else {
- // subscribe to a single topic
- if (qualityOfService == null) {
- client.subscribe(topic);
- } else {
- client.subscribe(topic, qualityOfService);
- }
- }
- }
-
/**
* Stops streamer.
*/
@@ -194,50 +220,250 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
if (stopped)
throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+ // stop the retrier
+ connectionRetrier.stop();
+
try {
if (disconnectForcibly) {
- if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+ if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null)
client.disconnectForcibly();
- } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+
+ else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null)
client.disconnectForcibly(disconnectForciblyTimeout);
- } else {
+
+ else
client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
- }
+
} else {
- if (disconnectQuiesceTimeout == null) {
+ if (disconnectQuiesceTimeout == null)
client.disconnect();
- } else {
+
+ else
client.disconnect(disconnectQuiesceTimeout);
- }
+
}
+
+ client.close();
+ connected = false;
+ stopped = true;
+
}
catch (Throwable t) {
throw new IgniteException("Exception while stopping MqttStreamer", t);
}
}
+ // -------------------------------
+ // MQTT Client callback methods
+ // -------------------------------
+
@Override public void connectionLost(Throwable throwable) {
- log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
- // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
- try {
- connectAndSubscribe();
- }
- catch (MqttException e) {
- e.printStackTrace();
- }
+ connected = false;
+
+ // if we have been stopped, we do not try to establish the connection again
+ if (stopped)
+ return;
+
+ log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable);
+ connectionRetrier.connect();
}
@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
if (getMultipleTupleExtractor() != null) {
Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+ if (log.isTraceEnabled()) {
+ log.trace("Adding cache entries: " + entries);
+ }
getStreamer().addData(entries);
- } else {
+ }
+ else {
Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+ if (log.isTraceEnabled()) {
+ log.trace("Adding cache entry: " + entry);
+ }
getStreamer().addData(entry);
}
}
@Override public void deliveryComplete(IMqttDeliveryToken token) {
- // ignore, we don't send messages
+ // ignore, as we don't send messages
+ }
+
+ // -------------------------------
+ // Getters and setters
+ // -------------------------------
+
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+
+ public void setBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Integer getQualityOfService() {
+ return qualityOfService;
+ }
+
+ public void setQualityOfService(Integer qualityOfService) {
+ this.qualityOfService = qualityOfService;
+ }
+
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
+
+ public List<Integer> getQualitiesOfService() {
+ return qualitiesOfService;
+ }
+
+ public void setQualitiesOfService(List<Integer> qualitiesOfService) {
+ this.qualitiesOfService = qualitiesOfService;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public MqttClientPersistence getPersistence() {
+ return persistence;
+ }
+
+ public void setPersistence(MqttClientPersistence persistence) {
+ this.persistence = persistence;
+ }
+
+ public MqttConnectOptions getConnectOptions() {
+ return connectOptions;
+ }
+
+ public void setConnectOptions(MqttConnectOptions connectOptions) {
+ this.connectOptions = connectOptions;
+ }
+
+ public boolean isDisconnectForcibly() {
+ return disconnectForcibly;
+ }
+
+ public void setDisconnectForcibly(boolean disconnectForcibly) {
+ this.disconnectForcibly = disconnectForcibly;
+ }
+
+ public Integer getDisconnectQuiesceTimeout() {
+ return disconnectQuiesceTimeout;
+ }
+
+ public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
+ this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
+ }
+
+ public Integer getDisconnectForciblyTimeout() {
+ return disconnectForciblyTimeout;
+ }
+
+ public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
+ this.disconnectForciblyTimeout = disconnectForciblyTimeout;
}
+
+ public WaitStrategy getRetryWaitStrategy() {
+ return retryWaitStrategy;
+ }
+
+ public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
+ this.retryWaitStrategy = retryWaitStrategy;
+ }
+
+ public StopStrategy getRetryStopStrategy() {
+ return retryStopStrategy;
+ }
+
+ public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
+ this.retryStopStrategy = retryStopStrategy;
+ }
+
+ public boolean isBlockUntilConnected() {
+ return blockUntilConnected;
+ }
+
+ public void setBlockUntilConnected(boolean blockUntilConnected) {
+ this.blockUntilConnected = blockUntilConnected;
+ }
+
+ private class MqttConnectionRetrier {
+
+ private final Retryer<Boolean> retrier;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ public MqttConnectionRetrier(Retryer<Boolean> retrier) {
+ this.retrier = retrier;
+ }
+
+ public void connect() {
+ Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ // if we're already connected, return immediately
+ if (connected)
+ return true;
+
+ if (stopped)
+ return false;
+
+ // connect to broker
+ if (connectOptions == null)
+ client.connect();
+ else
+ client.connect(connectOptions);
+
+ // always use the multiple topics variant of the mqtt client; even if the user specified a single
+ // topic and/or QoS, the initialization code would have placed it inside the 1..n structures
+ if (qualitiesOfService.isEmpty())
+ client.subscribe(topics.toArray(new String[0]));
+
+ else {
+ int[] qoses = new int[qualitiesOfService.size()];
+ for (int i = 0; i < qualitiesOfService.size(); i++)
+ qoses[i] = qualitiesOfService.get(i);
+
+ client.subscribe(topics.toArray(new String[0]), qoses);
+ }
+
+ connected = true;
+ return connected;
+ }
+ });
+
+ Future<Boolean> result = executor.submit(callable);
+
+ if (blockUntilConnected) {
+ try {
+ result.get();
+ }
+ catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void stop() {
+ executor.shutdownNow();
+ }
+
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 59730fa..012486a 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -17,11 +17,47 @@
package org.apache.ignite.stream.mqtt;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Splitter;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
/**
* Test for {@link MqttStreamer}.
*
@@ -29,6 +65,24 @@ import org.junit.Before;
*/
public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+ private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+ private static final String SINGLE_TOPIC_NAME = "abc";
+ private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+
+ private BrokerService broker;
+ private MqttClient client;
+ private String brokerUrl;
+ private int port;
+ private MqttStreamer<Integer, String> streamer;
+ private UUID remoteListener;
+
+ static {
+ for (int i = 0; i < 100; i++)
+ TEST_DATA.put(i, "v" + i);
+ }
+
+ private IgniteDataStreamer<Integer, String> dataStreamer;
+
/** Constructor. */
public IgniteMqttStreamerTest() {
super(true);
@@ -38,13 +92,394 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
public void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+ // find an available local port
+ try (ServerSocket ss = new ServerSocket(0)) {
+ port = ss.getLocalPort();
+ }
+
+ // create the broker
+ broker = new BrokerService();
+ broker.deleteAllMessages();
+ broker.setPersistent(false);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setQueuePrefetch(1);
+ broker.setDestinationPolicy(policyMap);
+ broker.getDestinationPolicy().setDefaultEntry(policy);
+
+ // add the MQTT transport connector to the broker
+ broker.addConnector("mqtt://localhost:" + port);
+ broker.setStartAsync(false);
+ broker.start(true);
+
+ // create the broker URL
+ brokerUrl = "tcp://localhost:" + port;
+
+ // create the client and connect
+ client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+ client.connect();
+
+ // create mqtt streamer
+ dataStreamer = grid().dataStreamer(null);
+ streamer = createMqttStreamer(dataStreamer);
}
@After
public void afterTest() throws Exception {
+ try {
+ streamer.stop();
+ }
+ catch (Exception e) {
+ // ignore if already stopped
+ }
+
+ dataStreamer.close();
+
grid().cache(null).clear();
+ broker.stop();
+ broker.deleteAllMessages();
+
+ }
+
+ public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+ }
+
+ public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
+ // configure streamer
+ streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, true);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
+ // configure streamer
+ streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, true);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+ }
+
+ public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ MqttConnectOptions connOptions = new MqttConnectOptions();
+ connOptions.setCleanSession(false);
+ streamer.setConnectOptions(connOptions);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ // explicitly stop the streamer
+ streamer.stop();
+
+ // send messages while stopped
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+ latch = subscribeToPutEvents(50);
+
+ // start the streamer again
+ streamer.start();
+
+ // assertions - make sure that messages sent during disconnection were also received
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(100);
+ }
+
+ public void testSingleTopic_NoQoS_Reconnect() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+ streamer.setRetryStopStrategy(StopStrategies.neverStop());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ // now shutdown the broker, wait 2 seconds and start it again
+ broker.stop();
+ broker.start(true);
+ broker.waitUntilStarted();
+ Thread.sleep(2000);
+ client.connect();
+
+ // let's ensure we have 2 connections: Ignite and our test
+ assertEquals(2, broker.getTransportConnectorByScheme("mqtt").getConnections().size());
+
+ // subscribe to cache PUT events again
+ latch = subscribeToPutEvents(50);
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(100);
+ }
+
+ public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+ streamer.setRetryStopStrategy(StopStrategies.stopAfterAttempt(1));
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ // now shutdown the broker, wait 2 seconds and start it again
+ broker.stop();
+ broker.start(true);
+ broker.waitUntilStarted();
+ client.connect();
+
+ // lets send messages and ensure they are not received, because our retrier desisted
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+ Thread.sleep(3000);
+ assertNull(grid().cache(null).get(50));
+
+ }
+
+ public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+ streamer.setQualitiesOfService(Arrays.asList(1, 1, 1, 1));
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+ }
+
+ public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+ streamer.setQualitiesOfService(Arrays.asList(1, 1, 1));
+
+ try {
+ streamer.start();
+ }
+ catch (Exception e) {
+ return;
+ }
+ fail("Expected an exception reporting invalid parameters");
+
+ }
+
+ private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
+ MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
+ streamer.setIgnite(grid());
+ streamer.setStreamer(dataStreamer);
+ streamer.setBrokerUrl(brokerUrl);
+ streamer.setClientId(UUID.randomUUID().toString());
+ streamer.setBlockUntilConnected(true);
+
+ dataStreamer.allowOverwrite(true);
+ dataStreamer.autoFlushFrequency(1);
+
+ return streamer;
+ }
+
+ public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+ if (singleMessage) {
+ final List<StringBuilder> sbs = new ArrayList<>(topics.size());
+ // initialize String Builders for each topic
+ F.forEach(topics, new IgniteInClosure<String>() {
+ @Override public void apply(String s) {
+ sbs.add(new StringBuilder());
+ }
+ });
+ // fill String Builders for each topic
+ F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() {
+ @Override public void apply(Integer integer) {
+ sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
+ }
+ });
+ // send each buffer out
+ for (int i = 0; i < topics.size(); i++) {
+ MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes());
+ client.publish(topics.get(i % topics.size()), msg);
+ }
+ }
+ else {
+ for (int i = fromIdx; i < fromIdx + count; i++) {
+ byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
+ MqttMessage msg = new MqttMessage(payload);
+ client.publish(topics.get(i % topics.size()), msg);
+ }
+ }
+ }
+
+ private CountDownLatch subscribeToPutEvents(int expect) {
+ Ignite ignite = grid();
+
+ // Listen to cache PUT events and expect as many as messages as test data items
+ final CountDownLatch latch = new CountDownLatch(expect);
+ @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+ return true;
+ }
+ };
+
+ remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+ return latch;
+ }
+
+ private void assertCacheEntriesLoaded(int count) {
+ // get the cache and check that the entries are present
+ IgniteCache<Integer, String> cache = grid().cache(null);
+
+ // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
+ for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+ assertEquals(TEST_DATA.get(key), cache.get(key));
+ }
+
+ // assert that the cache exactly the specified amount of elements
+ assertEquals(count, cache.size(CachePeekMode.ALL));
+
+ // remove the event listener
+ grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+ }
+
+ public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
+ return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
+ List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
+ return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+ }
+ };
+ }
+ public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
+ return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
+ @Override public Map<Integer, String> extract(MqttMessage msg) {
+ final Map<String, String> map = Splitter.on("\n")
+ .omitEmptyStrings()
+ .withKeyValueSeparator(",")
+ .split(new String(msg.getPayload()));
+ final Map<Integer, String> answer = new HashMap<>();
+ F.forEach(map.keySet(), new IgniteInClosure<String>() {
+ @Override public void apply(String s) {
+ answer.put(Integer.parseInt(s), map.get(s));
+ }
+ });
+ return answer;
+ }
+ };
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
deleted file mode 100644
index e2ed0f0..0000000
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.mqtt;
-
-/**
- * Test transformers for MqttStreamer tests.
- *
- * @author Raul Kripalani
- */
-public class TestTupleExtractors {
-
-
-}
\ No newline at end of file
[4/5] ignite git commit: IGNITE-535 Finish MQTT Streamer docs and
tests. Upgrade latter to AMQ 5.12.0.
Posted by ra...@apache.org.
IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/296dd6e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/296dd6e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/296dd6e7
Branch: refs/heads/master
Commit: 296dd6e7d86fe6d0914a9fbf8062632c04e4d22c
Parents: f03f3a3
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:24:44 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:24:44 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 9 +-
.../apache/ignite/stream/mqtt/MqttStreamer.java | 156 ++++++++++++++++++-
.../stream/mqtt/IgniteMqttStreamerTest.java | 80 +++++++++-
3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index 4b0b46c..21511e8 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,7 @@
<properties>
<paho.version>1.0.2</paho.version>
- <activemq.version>5.11.1</activemq.version>
+ <activemq.version>5.12.0</activemq.version>
<guava-retryier.version>2.0.0</guava-retryier.version>
</properties>
@@ -69,13 +69,6 @@
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kahadb-store</artifactId>
- <version>${activemq.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index b86d385..f18ae42 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -62,12 +62,17 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
* <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
* <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
* sessions, etc.</li>
- * <li>Specifying the client ID.</li>
+ * <li>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user
+ * does not provide one.</li>
+ * <li>(Re-)Connection retries based on the <i>guava-retrying</i> library. Retry wait and retry stop policies
+ * can be configured.</li>
+ * <li>Blocking the start() method until connected for the first time.</li>
* </ul>
*
- * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * Note: features like durable subscriptions, last will testament, etc. can be configured via the
* {@link #connectOptions} property.
*
+ * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
* @author Raul Kripalani
*/
public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -75,46 +80,65 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
/** Logger. */
private IgniteLogger log;
+ /** The MQTT client object for internal use. */
private MqttClient client;
+ /** The broker URL, set by the user. */
private String brokerUrl;
+ /** The topic to subscribe to, if a single topic. */
private String topic;
+ /** The quality of service to use for a single topic subscription (optional). */
private Integer qualityOfService;
+ /** The topics to subscribe to, if many. */
private List<String> topics;
+ /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+ * number of elements as {@link #topics}. */
private List<Integer> qualitiesOfService;
- /** Client ID in case we're using durable subscribers. */
+ /** The MQTT client ID (optional). */
private String clientId;
+ /** A configurable persistence mechanism. If not set, Paho will use its default. */
private MqttClientPersistence persistence;
+ /** The MQTT client connect options, where users can configured the last will and testament, durability, etc. */
private MqttConnectOptions connectOptions;
- // disconnect parameters
+ /** Quiesce timeout on disconnection. */
private Integer disconnectQuiesceTimeout;
+ /** Whether to disconnect forcibly or not. */
private boolean disconnectForcibly;
+ /** If disconnecting forcibly, the timeout. */
private Integer disconnectForciblyTimeout;
+ /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+ * Fibonacci-based strategy. */
private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+ /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+ /** The internal connection retrier object with a thread pool of size 1. */
private MqttConnectionRetrier connectionRetrier;
+ /** Whether to block the start() method until connected for the first time. */
+ private boolean blockUntilConnected;
+
+ /** State keeping. */
private volatile boolean stopped = true;
+ /** State keeping. */
private volatile boolean connected;
+ /** Cached log prefix for cache messages. */
private String cachedLogPrefix;
- private boolean blockUntilConnected;
-
/**
* Starts streamer.
*
@@ -136,7 +160,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
"both single and multiple tuple extractor");
A.notNullOrEmpty(brokerUrl, "broker URL");
- A.notNullOrEmpty(clientId, "client ID");
+
+ // if the client ID is empty, generate one
+ if (clientId == null || clientId.length() == 0) {
+ clientId = MqttClient.generateClientId();
+ }
// if we have both a single topic and a list of topics (but the list of topic is not of
// size 1 and == topic, as this would be a case of re-initialization), fail
@@ -257,6 +285,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// MQTT Client callback methods
// -------------------------------
+ /**
+ * {@inheritDoc}
+ */
@Override public void connectionLost(Throwable throwable) {
connected = false;
@@ -268,6 +299,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
connectionRetrier.connect();
}
+ /**
+ * {@inheritDoc}
+ */
@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
if (getMultipleTupleExtractor() != null) {
Map<K, V> entries = getMultipleTupleExtractor().extract(message);
@@ -285,6 +319,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override public void deliveryComplete(IMqttDeliveryToken token) {
// ignore, as we don't send messages
}
@@ -293,127 +330,229 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// Getters and setters
// -------------------------------
+ /**
+ * @return
+ */
public String getBrokerUrl() {
return brokerUrl;
}
+ /**
+ * @param brokerUrl The Broker URL (compulsory).
+ */
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
+ /**
+ * @return
+ */
public String getTopic() {
return topic;
}
+ /**
+ * @param topic The topic to subscribe to, if a single topic.
+ */
public void setTopic(String topic) {
this.topic = topic;
}
+ /**
+ * @return
+ */
public Integer getQualityOfService() {
return qualityOfService;
}
+ /**
+ * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+ */
public void setQualityOfService(Integer qualityOfService) {
this.qualityOfService = qualityOfService;
}
+ /**
+ * @return
+ */
public List<String> getTopics() {
return topics;
}
+ /**
+ * @param topics The topics to subscribe to, if many.
+ */
public void setTopics(List<String> topics) {
this.topics = topics;
}
+ /**
+ * @return
+ */
public List<Integer> getQualitiesOfService() {
return qualitiesOfService;
}
+ /**
+ * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
+ * If specified, the list must contain the same number of elements as {@link #topics}.
+ */
public void setQualitiesOfService(List<Integer> qualitiesOfService) {
this.qualitiesOfService = qualitiesOfService;
}
+ /**
+ * @return
+ */
public String getClientId() {
return clientId;
}
+ /**
+ * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+ * it througout any reconnection attempts.
+ */
public void setClientId(String clientId) {
this.clientId = clientId;
}
+ /**
+ * @return
+ */
public MqttClientPersistence getPersistence() {
return persistence;
}
+ /**
+ * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+ */
public void setPersistence(MqttClientPersistence persistence) {
this.persistence = persistence;
}
+ /**
+ * @return
+ */
public MqttConnectOptions getConnectOptions() {
return connectOptions;
}
+ /**
+ * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+ */
public void setConnectOptions(MqttConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}
+ /**
+ * @return
+ */
public boolean isDisconnectForcibly() {
return disconnectForcibly;
}
+ /**
+ * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+ */
public void setDisconnectForcibly(boolean disconnectForcibly) {
this.disconnectForcibly = disconnectForcibly;
}
+ /**
+ * @return
+ */
public Integer getDisconnectQuiesceTimeout() {
return disconnectQuiesceTimeout;
}
+ /**
+ * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+ */
public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
}
+ /**
+ * @return
+ */
public Integer getDisconnectForciblyTimeout() {
return disconnectForciblyTimeout;
}
+ /**
+ * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+ */
public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
this.disconnectForciblyTimeout = disconnectForciblyTimeout;
}
+ /**
+ * @return
+ */
public WaitStrategy getRetryWaitStrategy() {
return retryWaitStrategy;
}
+ /**
+ * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
+ * By default, this streamer uses a Fibonacci-based strategy.
+ */
public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
this.retryWaitStrategy = retryWaitStrategy;
}
+ /**
+ * @return
+ */
public StopStrategy getRetryStopStrategy() {
return retryStopStrategy;
}
+ /**
+ * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+ */
public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
this.retryStopStrategy = retryStopStrategy;
}
+ /**
+ * @return
+ */
public boolean isBlockUntilConnected() {
return blockUntilConnected;
}
+ /**
+ * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
+ * false.
+ */
public void setBlockUntilConnected(boolean blockUntilConnected) {
this.blockUntilConnected = blockUntilConnected;
}
+ /**
+ * A utility class to help us with (re-)connecting to the MQTT broker. It uses a single-threaded executor to perform
+ * the (re-)connections.
+ */
private class MqttConnectionRetrier {
+ /** The guava-retrying retrier object. */
private final Retryer<Boolean> retrier;
+
+ /** Single-threaded pool. */
private ExecutorService executor = Executors.newSingleThreadExecutor();
+ /**
+ * Constructor.
+ * @param retrier The retryier object.
+ */
public MqttConnectionRetrier(Retryer<Boolean> retrier) {
this.retrier = retrier;
}
+ /**
+ * Method that is called by the streamer to ask us to (re-)connect.
+ */
public void connect() {
Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -460,6 +599,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
}
+ /**
+ * Stops this connection utility class by shutting down the thread pool.
+ */
public void stop() {
executor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 012486a..5ac7339 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -65,24 +65,41 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
*/
public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+ /** The test data. */
private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+ /** Topic name for single topic tests. */
private static final String SINGLE_TOPIC_NAME = "abc";
+
+ /** Topic names for multiple topic tests. */
private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+ /** The AMQ broker with an MQTT interface. */
private BrokerService broker;
+
+ /** The MQTT client. */
private MqttClient client;
+
+ /** The broker URL. */
private String brokerUrl;
+
+ /** The broker port. **/
private int port;
+
+ /** The MQTT streamer currently under test. */
private MqttStreamer<Integer, String> streamer;
+
+ /** The UUID of the currently active remote listener. */
private UUID remoteListener;
+ /** The Ignite data streamer. */
+ private IgniteDataStreamer<Integer, String> dataStreamer;
+
static {
for (int i = 0; i < 100; i++)
TEST_DATA.put(i, "v" + i);
}
- private IgniteDataStreamer<Integer, String> dataStreamer;
-
/** Constructor. */
public IgniteMqttStreamerTest() {
super(true);
@@ -99,14 +116,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
// create the broker
broker = new BrokerService();
- broker.deleteAllMessages();
+ broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
+ broker.setPersistenceAdapter(null);
+ broker.setPersistenceFactory(null);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setQueuePrefetch(1);
broker.setDestinationPolicy(policyMap);
broker.getDestinationPolicy().setDefaultEntry(policy);
+ broker.setSchedulerSupport(false);
// add the MQTT transport connector to the broker
broker.addConnector("mqtt://localhost:" + port);
@@ -143,6 +163,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -162,6 +185,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(50);
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -185,6 +211,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
// configure streamer
streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -204,6 +233,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(50);
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
// configure streamer
streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -227,6 +259,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -265,6 +300,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(100);
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_Reconnect() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -306,6 +344,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(100);
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -339,6 +380,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -363,6 +407,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -379,6 +426,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(grid());
@@ -393,7 +443,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return streamer;
}
- public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+ /**
+ * @throws Exception
+ */
+ private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
if (singleMessage) {
final List<StringBuilder> sbs = new ArrayList<>(topics.size());
// initialize String Builders for each topic
@@ -423,6 +476,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
}
+ /**
+ * @throws Exception
+ */
private CountDownLatch subscribeToPutEvents(int expect) {
Ignite ignite = grid();
@@ -439,14 +495,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return latch;
}
+ /**
+ * @throws Exception
+ */
private void assertCacheEntriesLoaded(int count) {
// get the cache and check that the entries are present
IgniteCache<Integer, String> cache = grid().cache(null);
// for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
- for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+ for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
assertEquals(TEST_DATA.get(key), cache.get(key));
- }
// assert that the cache exactly the specified amount of elements
assertEquals(count, cache.size(CachePeekMode.ALL));
@@ -455,6 +513,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
}
+ /**
+ * Returns a {@link StreamSingleTupleExtractor} for testing.
+ *
+ * @throws Exception
+ */
public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
@@ -464,6 +527,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
};
}
+ /**
+ * Returns a {@link StreamMultipleTupleExtractor} for testing.
+ *
+ * @throws Exception
+ */
public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map<Integer, String> extract(MqttMessage msg) {
[5/5] ignite git commit: IGNITE-535 Merge MQTT Streamer into master.
Posted by ra...@apache.org.
IGNITE-535 Merge MQTT Streamer into master.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88acd318
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88acd318
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88acd318
Branch: refs/heads/master
Commit: 88acd318b84ce3bff8c061bb34718e0e5f7127fb
Parents: 421a523 296dd6e
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:26:04 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:26:04 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 114 ++++
.../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
.../stream/mqtt/IgniteMqttStreamerTest.java | 553 +++++++++++++++++
.../mqtt/IgniteMqttStreamerTestSuite.java | 34 ++
pom.xml | 1 +
5 files changed, 1313 insertions(+)
----------------------------------------------------------------------
[3/5] ignite git commit: Merge branch 'master' into
feature/ignite-535-mqtt
Posted by ra...@apache.org.
Merge branch 'master' into feature/ignite-535-mqtt
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f03f3a3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f03f3a3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f03f3a3b
Branch: refs/heads/master
Commit: f03f3a3b48fa105f318e9493440671188770f4ef
Parents: 53683e2 421a523
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:36:53 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:36:53 2015 +0100
----------------------------------------------------------------------
.../org/apache/ignite/IgniteAtomicLong.java | 15 +-
.../apache/ignite/IgniteAtomicReference.java | 9 +-
.../org/apache/ignite/IgniteAtomicSequence.java | 9 +-
.../org/apache/ignite/IgniteAtomicStamped.java | 13 +-
.../configuration/NearCacheConfiguration.java | 18 +-
.../apache/ignite/internal/IgniteKernal.java | 7 -
.../processors/cache/GridCacheContext.java | 6 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 ++--
.../processors/cache/GridCacheMessage.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 34 ++-
.../GridCachePartitionExchangeManager.java | 41 +++-
.../processors/cache/GridCacheProcessor.java | 28 ++-
.../GridDistributedLockResponse.java | 6 +-
.../GridDistributedTxPrepareResponse.java | 6 +-
.../distributed/dht/GridDhtLocalPartition.java | 26 +-
.../distributed/dht/GridDhtTopologyFuture.java | 6 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 +-
.../colocated/GridDhtColocatedLockFuture.java | 44 +++-
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 19 +-
.../distributed/near/GridNearGetResponse.java | 6 +-
.../distributed/near/GridNearLockFuture.java | 26 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 20 +-
.../near/GridNearTxFinishResponse.java | 6 +-
.../query/GridCacheDistributedQueryFuture.java | 27 +-
.../cache/query/GridCacheLocalQueryFuture.java | 5 +
.../cache/query/GridCacheQueryAdapter.java | 170 ++++++++-----
.../query/GridCacheQueryFutureAdapter.java | 11 +-
.../cache/query/GridCacheQueryManager.java | 30 ++-
.../cache/query/GridCacheQueryResponse.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 12 +-
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +-
.../distributed/CacheAffEarlySelfTest.java | 245 +++++++++++++++++++
.../GridCacheSwapScanQueryAbstractSelfTest.java | 118 ++++-----
.../processors/igfs/IgfsAbstractSelfTest.java | 5 +-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++
...CacheScanPartitionQueryFallbackSelfTest.java | 244 +++++-------------
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
.../Apache.Ignite.Core/Impl/IgniteManager.cs | 2 -
.../ignite/visor/commands/VisorConsole.scala | 37 ++-
.../config/benchmark-put-indexed-val.properties | 2 +-
modules/yardstick/config/ignite-base-config.xml | 2 +-
.../yardstick/IgniteBenchmarkArguments.java | 24 +-
.../org/apache/ignite/yardstick/IgniteNode.java | 12 +-
57 files changed, 1084 insertions(+), 535 deletions(-)
----------------------------------------------------------------------