You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 16:59:44 UTC
[04/23] ignite git commit: IGNITE-535 (WIP) Implement MQTT Streamer.
IGNITE-535 (WIP) Implement MQTT Streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b53f1bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b53f1bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b53f1bb
Branch: refs/heads/ignite-1513-final
Commit: 6b53f1bb2699ae7a6ea8ae277de1bfc3ecbd7b6a
Parents: b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:45:58 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 22:30:09 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 110 +++++++++
.../apache/ignite/stream/mqtt/MqttStreamer.java | 243 +++++++++++++++++++
.../stream/mqtt/IgniteMqttStreamerTest.java | 50 ++++
.../mqtt/IgniteMqttStreamerTestSuite.java | 34 +++
.../ignite/stream/mqtt/TestTupleExtractors.java | 28 +++
pom.xml | 1 +
6 files changed, 466 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
new file mode 100644
index 0000000..b108180
--- /dev/null
+++ b/modules/mqtt/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-mqtt</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <properties>
+ <paho.version>1.0.2</paho.version>
+ <mosquette.version>0.7</mosquette.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>${paho.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.moquette</groupId>
+ <artifactId>moquette-broker</artifactId>
+ <version>${mosquette.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
+ <repositories>
+ <repository>
+ <id>bintray</id>
+ <url>http://dl.bintray.com/andsel/maven/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <id>Eclipse Paho Repo</id>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
new file mode 100644
index 0000000..00a89ab
--- /dev/null
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * Streamer that consumes from a MQTT topic and feeds key-value pairs into an {@link IgniteDataStreamer} instance,
+ * using Eclipse Paho as an MQTT client.
+ * <p>
+ * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract
+ * cache tuples out of the incoming message.
+ * <p>
+ * This Streamer has many features:
+ *
+ * <ul>
+ * <li>Subscribing to a single topic or multiple topics at once.</li>
+ * <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
+ * <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
+ * sessions, etc.</li>
+ * <li>Specifying the client ID.</li>
+ * </ul>
+ *
+ * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * {@link #connectOptions} property.
+ *
+ * @author Raul Kripalani
+ */
+public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ private MqttClient client;
+
+ private String brokerUrl;
+
+ private String topic;
+
+ private Integer qualityOfService;
+
+ private List<String> topics;
+
+ private List<Integer> qualitiesOfService;
+
+ /** Client ID in case we're using durable subscribers. */
+ private String clientId;
+
+ private MqttClientPersistence persistence;
+
+ private MqttConnectOptions connectOptions;
+
+ // disconnect parameters
+ private Integer disconnectQuiesceTimeout;
+
+ private boolean disconnectForcibly;
+
+ private Integer disconnectForciblyTimeout;
+
+ private volatile boolean stopped = true;
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() throws IgniteException {
+ if (!stopped)
+ throw new IgniteException("Attempted to start an already started MQTT Streamer");
+
+ // for simplicity, if these are null initialize to empty lists
+ topics = topics == null ? new ArrayList<String>() : topics;
+ qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
+
+ try {
+ // parameter validations
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+ A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
+ "both single and multiple tuple extractor");
+ A.notNullOrEmpty(brokerUrl, "broker URL");
+ A.notNullOrEmpty(clientId, "client ID");
+
+ // if we have both a single topic and a list of topics, fail
+ if (topic != null && topic.length() > 0 && !topics.isEmpty())
+ throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
+
+ // if we have both a single QoS and list, fail
+ if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+ throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
+ }
+
+ // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
+ if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+ A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
+ "with quiesce");
+ }
+
+ // if we have multiple topics
+ if (topics != null && !topics.isEmpty()) {
+ for (String t : topics) {
+ A.notNullOrEmpty(t, "topic in list of topics");
+ }
+ A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
+ "service must be either empty or have the same size as topics list");
+ }
+
+ // create logger
+ log = getIgnite().log();
+
+ // create the mqtt client
+ if (persistence == null)
+ client = new MqttClient(brokerUrl, clientId);
+ else
+ client = new MqttClient(brokerUrl, clientId, persistence);
+
+ connectAndSubscribe();
+
+ stopped = false;
+
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Exception while initializing MqttStreamer", t);
+ }
+
+ }
+
+ private void connectAndSubscribe() throws MqttException {
+ // connect
+ if (connectOptions != null)
+ client.connect();
+ else
+ client.connect(connectOptions);
+
+ // subscribe to multiple topics
+ if (!topics.isEmpty()) {
+ if (qualitiesOfService.isEmpty()) {
+ client.subscribe(topics.toArray(new String[0]));
+ } else {
+ int[] qoses = new int[qualitiesOfService.size()];
+ for (int i = 0; i < qualitiesOfService.size(); i++)
+ qoses[i] = qualitiesOfService.get(i);
+
+ client.subscribe(topics.toArray(new String[0]), qoses);
+ }
+ } else {
+ // subscribe to a single topic
+ if (qualityOfService == null) {
+ client.subscribe(topic);
+ } else {
+ client.subscribe(topic, qualityOfService);
+ }
+ }
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() throws IgniteException {
+ if (stopped)
+ throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+
+ try {
+ if (disconnectForcibly) {
+ if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+ client.disconnectForcibly();
+ } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+ client.disconnectForcibly(disconnectForciblyTimeout);
+ } else {
+ client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
+ }
+ } else {
+ if (disconnectQuiesceTimeout == null) {
+ client.disconnect();
+ } else {
+ client.disconnect(disconnectQuiesceTimeout);
+ }
+ }
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Exception while stopping MqttStreamer", t);
+ }
+ }
+
+ @Override public void connectionLost(Throwable throwable) {
+ log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
+ // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
+ try {
+ connectAndSubscribe();
+ }
+ catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
+ if (getMultipleTupleExtractor() != null) {
+ Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+ getStreamer().addData(entries);
+ } else {
+ Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+ getStreamer().addData(entry);
+ }
+ }
+
+ @Override public void deliveryComplete(IMqttDeliveryToken token) {
+ // ignore, we don't send messages
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
new file mode 100644
index 0000000..59730fa
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test for {@link MqttStreamer}.
+ *
+ * @author Raul Kripalani
+ */
+public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+
+ /** Constructor. */
+ public IgniteMqttStreamerTest() {
+ super(true);
+ }
+
+ @Before @SuppressWarnings("unchecked")
+ public void beforeTest() throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ grid().cache(null).clear();
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
new file mode 100644
index 0000000..ff25145
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * MQTT streamer tests.
+ *
+ * @author Raul Kripalani
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ IgniteMqttStreamerTest.class
+})
+public class IgniteMqttStreamerTestSuite {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
new file mode 100644
index 0000000..e2ed0f0
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+/**
+ * Test transformers for MqttStreamer tests.
+ *
+ * @author Raul Kripalani
+ */
+public class TestTupleExtractors {
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b47958f..c19a9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>modules/kafka</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
+ <module>modules/mqtt</module>
<module>modules/zookeeper</module>
<module>modules/platform</module>
</modules>