You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:34:01 UTC
[05/14] storm git commit: STORM-2416 Release Packaging Improvements
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/pom.xml b/external/storm-mqtt/core/pom.xml
deleted file mode 100644
index dbb0396..0000000
--- a/external/storm-mqtt/core/pom.xml
+++ /dev/null
@@ -1,126 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>storm-mqtt</artifactId>
- <packaging>jar</packaging>
-
- <name>storm-mqtt</name>
-
- <parent>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-mqtt-parent</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
-
-
-
- <repositories>
- <repository>
- <id>bintray</id>
- <url>http://dl.bintray.com/andsel/maven/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- <version>5.9.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-mqtt</artifactId>
- <version>5.9.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kahadb-store</artifactId>
- <version>5.9.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>${provided.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.fusesource.mqtt-client</groupId>
- <artifactId>mqtt-client</artifactId>
- <version>1.10</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.5</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <forkMode>perTest</forkMode>
- <enableAssertions>false</enableAssertions>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <excludedGroups>${java.unit.test.exclude}</excludedGroups>
- <includes>
- <include>${java.unit.test.include}</include>
- </includes>
- <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <version>${maven-surefire.version}</version>
- <configuration>
- <forkMode>perTest</forkMode>
- <enableAssertions>false</enableAssertions>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <includes>
- <include>${java.integration.test.include}</include>
- </includes>
- <groups>${java.integration.test.group}</groups> <!--set in integration-test the profile-->
- <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java
deleted file mode 100644
index 3af73fd..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt;
-
-import org.fusesource.mqtt.client.Tracer;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper around SLF4J logger that allows MQTT messages to be logged.
- */
-public class MqttLogger extends Tracer {
- private static final Logger LOG = LoggerFactory.getLogger(MqttLogger.class);
-
- @Override
- public void debug(String message, Object... args) {
- LOG.debug(String.format(message, args));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java
deleted file mode 100644
index 5436dda..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt;
-
-/**
- * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat")
- * and a byte array message/payload.
- *
- */
-public class MqttMessage {
- private String topic;
- private byte[] message;
-
-
- public MqttMessage(String topic, byte[] payload){
- this.topic = topic;
- this.message = payload;
- }
- public byte[] getMessage(){
- return this.message;
- }
-
- public String getTopic(){
- return this.topic;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
deleted file mode 100644
index c6173f4..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt;
-
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.io.Serializable;
-
-/**
- * Represents an object that can be converted to a Storm Tuple from an AckableMessage,
- * given a MQTT Topic Name and a byte array payload.
- */
-public interface MqttMessageMapper extends Serializable {
- /**
- * Convert a `MqttMessage` to a set of Values that can be emitted as a Storm Tuple.
- *
- * @param message An MQTT Message.
- * @return Values representing a Storm Tuple.
- */
- Values toValues(MqttMessage message);
-
- /**
- * Returns the list of output fields this Mapper produces.
- *
- * @return the list of output fields this mapper produces.
- */
- Fields outputFields();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
deleted file mode 100644
index c46c069..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt;
-
-
-import org.apache.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * Given a Tuple, converts it to an MQTT message.
- */
-public interface MqttTupleMapper extends Serializable{
-
- /**
- * Converts a Tuple to a MqttMessage
- * @param tuple the incoming tuple
- * @return the message to publish
- */
- MqttMessage toMessage(ITuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
deleted file mode 100644
index f6ca1bf..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.bolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.MqttTupleMapper;
-import org.apache.storm.mqtt.common.MqttPublisher;
-import org.apache.storm.mqtt.common.SslUtils;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-
-public class MqttBolt extends BaseTickTupleAwareRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class);
- private MqttTupleMapper mapper;
- private transient MqttPublisher publisher;
- private boolean retain = false;
- private transient OutputCollector collector;
- private MqttOptions options;
- private KeyStoreLoader keyStoreLoader;
- private transient String topologyName;
-
-
- public MqttBolt(MqttOptions options, MqttTupleMapper mapper){
- this(options, mapper, null, false);
- }
-
- public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain){
- this(options, mapper, null, retain);
- }
-
- public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader){
- this(options, mapper, keyStoreLoader, false);
- }
-
- public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){
- this.options = options;
- this.mapper = mapper;
- this.retain = retain;
- this.keyStoreLoader = keyStoreLoader;
- // the following code is duplicated in the constructor of MqttPublisher
- // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology
- // is deployed to the cluster
- SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
- this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
- try {
- this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId());
- } catch (Exception e) {
- LOG.error("Unable to connect to MQTT Broker.", e);
- throw new RuntimeException("Unable to connect to MQTT Broker.", e);
- }
- }
-
- @Override
- protected void process(Tuple input) {
- MqttMessage message = this.mapper.toMessage(input);
- try {
- this.publisher.publish(message);
- this.collector.ack(input);
- } catch (Exception e) {
- LOG.warn("Error publishing MQTT message. Failing tuple.", e);
- // should we fail the tuple or kill the worker?
- collector.reportError(e);
- collector.fail(input);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // this bolt does not emit tuples
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
deleted file mode 100644
index 2b09d6e..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * MQTT Configuration Options
- */
-public class MqttOptions implements Serializable {
- private String url = "tcp://localhost:1883";
- private List<String> topics = null;
- private boolean cleanConnection = false;
-
- private String willTopic;
- private String willPayload;
- private int willQos = 1;
- private boolean willRetain = false;
-
- private long reconnectDelay = 10;
- private long reconnectDelayMax = 30*1000;
- private double reconnectBackOffMultiplier = 2.0f;
- private long reconnectAttemptsMax = -1;
- private long connectAttemptsMax = -1;
-
- private String userName = "";
- private String password = "";
-
- private int qos = 1;
-
- public String getUrl() {
- return url;
- }
-
- /**
- * Sets the url for connecting to the MQTT broker.
- *
- * Default: `tcp://localhost:1883'
- * @param url
- */
- public void setUrl(String url) {
- this.url = url;
- }
-
- public List<String> getTopics() {
- return topics;
- }
-
- /**
- * A list of MQTT topics to subscribe to.
- *
- * @param topics
- */
- public void setTopics(List<String> topics) {
- this.topics = topics;
- }
-
- public boolean isCleanConnection() {
- return cleanConnection;
- }
-
- /**
- * Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions.
- * Defaults to false.
- *
- * @param cleanConnection
- */
- public void setCleanConnection(boolean cleanConnection) {
- this.cleanConnection = cleanConnection;
- }
-
- public String getWillTopic() {
- return willTopic;
- }
-
- /**
- * If set the server will publish the client's Will message to the specified topics if the client has an unexpected
- * disconnection.
- *
- * @param willTopic
- */
- public void setWillTopic(String willTopic) {
- this.willTopic = willTopic;
- }
-
- public String getWillPayload() {
- return willPayload;
- }
-
- /**
- * The Will message to send. Defaults to a zero length message.
- *
- * @param willPayload
- */
- public void setWillPayload(String willPayload) {
- this.willPayload = willPayload;
- }
-
- public long getReconnectDelay() {
- return reconnectDelay;
- }
-
- /**
- * How long to wait in ms before the first reconnect attempt. Defaults to 10.
- *
- * @param reconnectDelay
- */
- public void setReconnectDelay(long reconnectDelay) {
- this.reconnectDelay = reconnectDelay;
- }
-
- public long getReconnectDelayMax() {
- return reconnectDelayMax;
- }
-
- /**
- * The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000.
- *
- * @param reconnectDelayMax
- */
- public void setReconnectDelayMax(long reconnectDelayMax) {
- this.reconnectDelayMax = reconnectDelayMax;
- }
-
- public double getReconnectBackOffMultiplier() {
- return reconnectBackOffMultiplier;
- }
-
- /**
- * The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to
- * 2.
- *
- * @param reconnectBackOffMultiplier
- */
- public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) {
- this.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
- }
-
- public long getReconnectAttemptsMax() {
- return reconnectAttemptsMax;
- }
-
- /**
- * The maximum number of reconnect attempts before an error is reported back to the client after a server
- * connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1.
- *
- * @param reconnectAttemptsMax
- */
- public void setReconnectAttemptsMax(long reconnectAttemptsMax) {
- this.reconnectAttemptsMax = reconnectAttemptsMax;
- }
-
- public long getConnectAttemptsMax() {
- return connectAttemptsMax;
- }
-
- /**
- * The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by
- * the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1.
- *
- * @param connectAttemptsMax
- */
- public void setConnectAttemptsMax(long connectAttemptsMax) {
- this.connectAttemptsMax = connectAttemptsMax;
- }
-
- public String getUserName() {
- return userName;
- }
-
- /**
- * The username for authenticated sessions.
- *
- * @param userName
- */
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- /**
- * The password for authenticated sessions.
- * @param password
- */
- public void setPassword(String password) {
- this.password = password;
- }
-
- public int getQos(){
- return this.qos;
- }
-
- /**
- * Sets the quality of service to use for MQTT messages. Defaults to 1 (at least once).
- * @param qos
- */
- public void setQos(int qos){
- if(qos < 0 || qos > 2){
- throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 2");
- }
- this.qos = qos;
- }
-
- public int getWillQos(){
- return this.willQos;
- }
-
- /**
- * Sets the quality of service to use for the MQTT Will message. Defaults to 1 (at least once).
- *
- * @param qos
- */
- public void setWillQos(int qos){
- if(qos < 0 || qos > 2){
- throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and <= 2");
- }
- this.willQos = qos;
- }
-
- public boolean getWillRetain(){
- return this.willRetain;
- }
-
- /**
- * Set to true if you want the Will message to be published with the retain option.
- * @param retain
- */
- public void setWillRetain(boolean retain){
- this.willRetain = retain;
- }
-
- public static class Builder {
- private MqttOptions options = new MqttOptions();
-
- public Builder url(String url) {
- this.options.url = url;
- return this;
- }
-
-
- public Builder topics(List<String> topics) {
- this.options.topics = topics;
- return this;
- }
-
- public Builder cleanConnection(boolean cleanConnection) {
- this.options.cleanConnection = cleanConnection;
- return this;
- }
-
- public Builder willTopic(String willTopic) {
- this.options.willTopic = willTopic;
- return this;
- }
-
- public Builder willPayload(String willPayload) {
- this.options.willPayload = willPayload;
- return this;
- }
-
- public Builder willRetain(boolean retain){
- this.options.willRetain = retain;
- return this;
- }
-
- public Builder willQos(int qos){
- this.options.setWillQos(qos);
- return this;
- }
-
- public Builder reconnectDelay(long reconnectDelay) {
- this.options.reconnectDelay = reconnectDelay;
- return this;
- }
-
- public Builder reconnectDelayMax(long reconnectDelayMax) {
- this.options.reconnectDelayMax = reconnectDelayMax;
- return this;
- }
-
- public Builder reconnectBackOffMultiplier(double reconnectBackOffMultiplier) {
- this.options.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
- return this;
- }
-
- public Builder reconnectAttemptsMax(long reconnectAttemptsMax) {
- this.options.reconnectAttemptsMax = reconnectAttemptsMax;
- return this;
- }
-
- public Builder connectAttemptsMax(long connectAttemptsMax) {
- this.options.connectAttemptsMax = connectAttemptsMax;
- return this;
- }
-
- public Builder userName(String userName) {
- this.options.userName = userName;
- return this;
- }
-
- public Builder password(String password) {
- this.options.password = password;
- return this;
- }
-
- public Builder qos(int qos){
- this.options.setQos(qos);
- return this;
- }
-
- public MqttOptions build() {
- return this.options;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
deleted file mode 100644
index 9b36b78..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.common;
-
-
-import org.apache.storm.mqtt.MqttLogger;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-
-public class MqttPublisher {
- private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class);
-
- private MqttOptions options;
- private transient BlockingConnection connection;
- private KeyStoreLoader keyStoreLoader;
- private QoS qos;
- private boolean retain = false;
-
-
- public MqttPublisher(MqttOptions options){
- this(options, null, false);
- }
-
- public MqttPublisher(MqttOptions options, boolean retain){
- this(options, null, retain);
- }
-
- public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain){
- this.retain = retain;
- this.options = options;
- this.keyStoreLoader = keyStoreLoader;
- SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
- this.qos = MqttUtils.qosFromInt(this.options.getQos());
- }
-
- public void publish(MqttMessage message) throws Exception {
- this.connection.publish(message.getTopic(), message.getMessage(), this.qos, this.retain);
- }
-
- public void connectMqtt(String clientId) throws Exception {
- MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader);
- this.connection = client.blockingConnection();
- this.connection.connect();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
deleted file mode 100644
index 4ca0145..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.common;
-
-
-import org.apache.storm.mqtt.MqttLogger;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-
-public class MqttUtils {
- private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class);
-
- private MqttUtils(){}
-
- public static QoS qosFromInt(int i){
- QoS qos = null;
- switch(i) {
- case 0:
- qos = QoS.AT_MOST_ONCE;
- break;
- case 1:
- qos = QoS.AT_LEAST_ONCE;
- break;
- case 2:
- qos = QoS.EXACTLY_ONCE;
- break;
- default:
- throw new IllegalArgumentException(i + "is not a valid MQTT QoS.");
- }
- return qos;
- }
-
-
- public static MQTT configureClient(MqttOptions options, String clientId, KeyStoreLoader keyStoreLoader)
- throws Exception{
-
- MQTT client = new MQTT();
- URI uri = URI.create(options.getUrl());
-
- client.setHost(uri);
- if(!uri.getScheme().toLowerCase().equals("tcp")){
- client.setSslContext(SslUtils.sslContext(uri.getScheme(), keyStoreLoader));
- }
- client.setClientId(clientId);
- LOG.info("MQTT ClientID: {}", client.getClientId().toString());
- client.setCleanSession(options.isCleanConnection());
-
- client.setReconnectDelay(options.getReconnectDelay());
- client.setReconnectDelayMax(options.getReconnectDelayMax());
- client.setReconnectBackOffMultiplier(options.getReconnectBackOffMultiplier());
- client.setConnectAttemptsMax(options.getConnectAttemptsMax());
- client.setReconnectAttemptsMax(options.getReconnectAttemptsMax());
-
-
- client.setUserName(options.getUserName());
- client.setPassword(options.getPassword());
- client.setTracer(new MqttLogger());
-
- if(options.getWillTopic() != null && options.getWillPayload() != null){
- QoS qos = MqttUtils.qosFromInt(options.getWillQos());
- client.setWillQos(qos);
- client.setWillTopic(options.getWillTopic());
- client.setWillMessage(options.getWillPayload());
- client.setWillRetain(options.getWillRetain());
- }
- return client;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
deleted file mode 100644
index 681fc1d..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.common;
-
-
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import java.net.URI;
-import java.security.KeyStore;
-
-public class SslUtils {
- private SslUtils(){}
-
- public static void checkSslConfig(String url, KeyStoreLoader loader){
- URI uri = URI.create(url);
- String scheme = uri.getScheme().toLowerCase();
- if(!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))){
- throw new IllegalArgumentException("Unrecognized URI scheme: " + scheme);
- }
- if(!scheme.equalsIgnoreCase("tcp") && loader == null){
- throw new IllegalStateException("A TLS/SSL MQTT URL was specified, but no KeyStoreLoader configured. " +
- "A KeyStoreLoader implementation is required when using TLS/SSL.");
- }
- }
-
- public static SSLContext sslContext(String scheme, KeyStoreLoader keyStoreLoader) throws Exception {
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(keyStoreLoader.keyStoreInputStream(), keyStoreLoader.keyStorePassword().toCharArray());
-
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(keyStoreLoader.trustStoreInputStream(), keyStoreLoader.trustStorePassword().toCharArray());
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(ks, keyStoreLoader.keyPassword().toCharArray());
-
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- tmf.init(ts);
-
- SSLContext sc = SSLContext.getInstance(scheme.toUpperCase());
- TrustManager[] trustManagers = tmf.getTrustManagers();
- sc.init(kmf.getKeyManagers(), trustManagers, null);
-
- return sc;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
deleted file mode 100644
index a19fce4..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.mappers;
-
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttMessageMapper;
-
-
-public class ByteArrayMessageMapper implements MqttMessageMapper {
- public Values toValues(MqttMessage message) {
- return new Values(message.getTopic(), message.getMessage());
- }
-
- public Fields outputFields() {
- return new Fields("topic", "message");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
deleted file mode 100644
index e5f309b..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.mappers;
-
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttMessageMapper;
-
-/**
- * Given a String topic and byte[] message, emits a tuple with fields
- * "topic" and "message", both of which are Strings.
- */
-public class StringMessageMapper implements MqttMessageMapper {
- public Values toValues(MqttMessage message) {
- return new Values(message.getTopic(), new String(message.getMessage()));
- }
-
- public Fields outputFields() {
- return new Fields("topic", "message");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
deleted file mode 100644
index 08348c9..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.spout;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.storm.mqtt.MqttMessage;
-
-/**
- * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat")
- * and a byte array message/payload.
- *
- */
-class AckableMessage {
- private String topic;
- private byte[] message;
- private Runnable ack;
-
- AckableMessage(String topic, byte[] message, Runnable ack){
- this.topic = topic;
- this.message = message;
- this.ack = ack;
- }
-
- public MqttMessage getMessage(){
- return new MqttMessage(this.topic, this.message);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(71, 123)
- .append(this.topic)
- .append(this.message)
- .toHashCode();
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) {
- return false;
- }
- AckableMessage tm = (AckableMessage)obj;
- return new EqualsBuilder()
- .appendSuper(super.equals(obj))
- .append(this.topic, tm.topic)
- .append(this.message, tm.message)
- .isEquals();
- }
-
- Runnable ack(){
- return this.ack;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
deleted file mode 100644
index 7f10cc5..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.spout;
-
-import org.apache.storm.Config;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.mqtt.MqttMessageMapper;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.common.MqttUtils;
-import org.apache.storm.mqtt.common.SslUtils;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.mqtt.client.Callback;
-import org.fusesource.mqtt.client.CallbackConnection;
-import org.fusesource.mqtt.client.Listener;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class MqttSpout implements IRichSpout, Listener {
- private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class);
-
- private String topologyName;
-
-
- private CallbackConnection connection;
-
- protected transient SpoutOutputCollector collector;
- protected transient TopologyContext context;
- protected transient LinkedBlockingQueue<AckableMessage> incoming;
- protected transient HashMap<Long, AckableMessage> pending;
- private transient Map conf;
- protected MqttMessageMapper type;
- protected MqttOptions options;
- protected KeyStoreLoader keyStoreLoader;
-
- private boolean mqttConnected = false;
- private boolean mqttConnectFailed = false;
-
-
- private Long sequence = Long.MIN_VALUE;
-
- private Long nextId(){
- this.sequence++;
- if(this.sequence == Long.MAX_VALUE){
- this.sequence = Long.MIN_VALUE;
- }
- return this.sequence;
- }
-
- protected MqttSpout(){}
-
- public MqttSpout(MqttMessageMapper type, MqttOptions options){
- this(type, options, null);
- }
-
- public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader){
- this.type = type;
- this.options = options;
- this.keyStoreLoader = keyStoreLoader;
- SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader);
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(this.type.outputFields());
- }
-
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
-
- this.collector = collector;
- this.context = context;
- this.conf = conf;
-
- this.incoming = new LinkedBlockingQueue<>();
- this.pending = new HashMap<>();
-
- try {
- connectMqtt();
- } catch (Exception e) {
- this.collector.reportError(e);
- throw new RuntimeException("MQTT Connection failed.", e);
- }
-
- }
-
- private void connectMqtt() throws Exception {
- String clientId = this.topologyName + "-" + this.context.getThisComponentId() + "-" +
- this.context.getThisTaskId();
-
- MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader);
- this.connection = client.callbackConnection();
- this.connection.listener(this);
- this.connection.connect(new ConnectCallback());
-
- while(!this.mqttConnected && !this.mqttConnectFailed){
- LOG.info("Waiting for connection...");
- Thread.sleep(500);
- }
-
- if(this.mqttConnected){
- List<String> topicList = this.options.getTopics();
- Topic[] topics = new Topic[topicList.size()];
- QoS qos = MqttUtils.qosFromInt(this.options.getQos());
- for(int i = 0;i < topicList.size();i++){
- topics[i] = new Topic(topicList.get(i), qos);
- }
- connection.subscribe(topics, new SubscribeCallback());
- }
- }
-
-
-
- public void close() {
- this.connection.disconnect(new DisconnectCallback());
- }
-
- public void activate() {
- }
-
- public void deactivate() {
- }
-
- /**
- * When this method is called, Storm is requesting that the Spout emit tuples to the
- * output collector. This method should be non-blocking, so if the Spout has no tuples
- * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
- * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
- * to have nextTuple sleep for a short amount of time (like a single millisecond)
- * so as not to waste too much CPU.
- */
- public void nextTuple() {
- AckableMessage tm = this.incoming.poll();
- if(tm != null){
- Long id = nextId();
- this.collector.emit(this.type.toValues(tm.getMessage()), id);
- this.pending.put(id, tm);
- } else {
- Thread.yield();
- }
-
- }
-
- /**
- * Storm has determined that the tuple emitted by this spout with the msgId identifier
- * has been fully processed. Typically, an implementation of this method will take that
- * message off the queue and prevent it from being replayed.
- *
- * @param msgId
- */
- public void ack(Object msgId) {
- AckableMessage msg = this.pending.remove(msgId);
- this.connection.getDispatchQueue().execute(msg.ack());
- }
-
- /**
- * The tuple emitted by this spout with the msgId identifier has failed to be
- * fully processed. Typically, an implementation of this method will put that
- * message back on the queue to be replayed at a later time.
- *
- * @param msgId
- */
- public void fail(Object msgId) {
- try {
- this.incoming.put(this.pending.remove(msgId));
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while re-queueing message.", e);
- }
- }
-
-
- // ################# Listener Implementation ######################
- public void onConnected() {
- // this gets called repeatedly for no apparent reason, don't do anything
- }
-
- public void onDisconnected() {
- // this gets called repeatedly for no apparent reason, don't do anything
- }
-
- public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
- LOG.debug("Received message: topic={}, payload={}", topic.toString(), new String(payload.toByteArray()));
- try {
- this.incoming.put(new AckableMessage(topic.toString(), payload.toByteArray(), ack));
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while queueing an MQTT message.");
- }
- }
-
- public void onFailure(Throwable throwable) {
- LOG.error("MQTT Connection Failure.", throwable);
- MqttSpout.this.connection.disconnect(new DisconnectCallback());
- throw new RuntimeException("MQTT Connection failure.", throwable);
- }
-
- // ################# Connect Callback Implementation ######################
- private class ConnectCallback implements Callback<Void> {
- public void onSuccess(Void v) {
- LOG.info("MQTT Connected. Subscribing to topic...");
- MqttSpout.this.mqttConnected = true;
- }
-
- public void onFailure(Throwable throwable) {
- LOG.info("MQTT Connection failed.");
- MqttSpout.this.mqttConnectFailed = true;
- }
- }
-
- // ################# Subscribe Callback Implementation ######################
- private class SubscribeCallback implements Callback<byte[]>{
- public void onSuccess(byte[] qos) {
- LOG.info("Subscripton sucessful.");
- }
-
- public void onFailure(Throwable throwable) {
- LOG.error("MQTT Subscripton failed.", throwable);
- throw new RuntimeException("MQTT Subscribe failed.", throwable);
- }
- }
-
- // ################# Subscribe Callback Implementation ######################
- private class DisconnectCallback implements Callback<Void>{
- public void onSuccess(Void aVoid) {
- LOG.info("MQTT Disconnect successful.");
- }
-
- public void onFailure(Throwable throwable) {
- // Disconnects don't fail.
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
deleted file mode 100644
index 8bca407..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.ssl;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-
-/**
- * KeyStoreLoader implementation that uses local files.
- */
-public class DefaultKeyStoreLoader implements KeyStoreLoader {
- private String ksFile = null;
- private String tsFile = null;
- private String keyStorePassword = "";
- private String trustStorePassword = "";
- private String keyPassword = "";
-
- /**
- * Creates a DefaultKeystoreLoader that uses the same file
- * for both the keystore and truststore.
- *
- * @param keystore path to keystore file
- */
- public DefaultKeyStoreLoader(String keystore){
- this.ksFile = keystore;
- }
-
- /**
- * Creates a DefaultKeystoreLoader that uses separate files
- * for the keystore and truststore.
- *
- * @param keystore path to keystore file
- * @param truststore path to truststore file
- */
- public DefaultKeyStoreLoader(String keystore, String truststore){
- this.ksFile = keystore;
- this.tsFile = truststore;
- }
-
- public void setKeyStorePassword(String keyStorePassword) {
- this.keyStorePassword = keyStorePassword;
- }
-
- public void setTrustStorePassword(String trustStorePassword) {
- this.trustStorePassword = trustStorePassword;
- }
-
- public void setKeyPassword(String keyPassword) {
- this.keyPassword = keyPassword;
- }
-
- @Override
- public InputStream keyStoreInputStream() throws FileNotFoundException {
- return new FileInputStream(this.ksFile);
- }
-
- @Override
- public InputStream trustStoreInputStream() throws FileNotFoundException {
- // if no truststore file, assume the truststore is the keystore.
- if(this.tsFile == null){
- return new FileInputStream(this.ksFile);
- } else {
- return new FileInputStream(this.tsFile);
- }
- }
-
- @Override
- public String keyStorePassword() {
- return this.keyStorePassword;
- }
-
- @Override
- public String trustStorePassword() {
- return this.trustStorePassword;
- }
-
- @Override
- public String keyPassword() {
- return this.keyPassword;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
deleted file mode 100644
index 297efcc..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-
-/**
- * Abstraction for loading keystore/truststore data. This allows keystores
- * to be loaded from different sources (File system, HDFS, etc.).
- */
-public interface KeyStoreLoader extends Serializable {
-
- String keyStorePassword();
- String trustStorePassword();
- String keyPassword();
- InputStream keyStoreInputStream() throws IOException;
- InputStream trustStoreInputStream() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
deleted file mode 100644
index e53c983..0000000
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.MqttTupleMapper;
-import org.apache.storm.mqtt.common.MqttPublisher;
-import org.apache.storm.mqtt.common.SslUtils;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import java.util.Map;
-
-public class MqttPublishFunction extends BaseFunction {
- private static final Logger LOG = LoggerFactory.getLogger(MqttPublishFunction.class);
- private MqttTupleMapper mapper;
- private transient MqttPublisher publisher;
- private boolean retain = false;
- private transient OutputCollector collector;
- private MqttOptions options;
- private KeyStoreLoader keyStoreLoader;
- private transient String topologyName;
-
-
- public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){
- this.options = options;
- this.mapper = mapper;
- this.retain = retain;
- this.keyStoreLoader = keyStoreLoader;
- // the following code is duplicated in the constructor of MqttPublisher
- // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology
- // is deployed to the cluster
- SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
- }
-
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
- this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
- try {
- this.publisher.connectMqtt(this.topologyName + "-" + context.getPartitionIndex());
- } catch (Exception e) {
- LOG.error("Unable to connect to MQTT Broker.", e);
- throw new RuntimeException("Unable to connect to MQTT Broker.", e);
- }
- }
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- MqttMessage message = this.mapper.toMessage(tuple);
- try {
- this.publisher.publish(message);
- } catch (Exception e) {
- LOG.warn("Error publishing MQTT message. Failing tuple.", e);
- // should we fail the batch or kill the worker?
- throw new FailedException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
deleted file mode 100644
index 0dd4d73..0000000
--- a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mqtt;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.testing.IntegrationTest;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.ITuple;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.storm.mqtt.bolt.MqttBolt;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.common.MqttPublisher;
-import org.apache.storm.mqtt.mappers.StringMessageMapper;
-import org.apache.storm.mqtt.spout.MqttSpout;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.util.Arrays;
-
-@Category(IntegrationTest.class)
-public class StormMqttIntegrationTest implements Serializable{
- private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class);
- private static BrokerService broker;
- static boolean spoutActivated = false;
-
- private static final String TEST_TOPIC = "/mqtt-topology";
- private static final String RESULT_TOPIC = "/integration-result";
- private static final String RESULT_PAYLOAD = "Storm MQTT Spout";
-
- public static class TestSpout extends MqttSpout{
- public TestSpout(MqttMessageMapper type, MqttOptions options){
- super(type, options);
- }
-
- @Override
- public void activate() {
- super.activate();
- LOG.info("Spout activated.");
- spoutActivated = true;
- }
- }
-
-
- @AfterClass
- public static void cleanup() throws Exception {
- broker.stop();
- }
-
- @BeforeClass
- public static void start() throws Exception {
- LOG.warn("Starting broker...");
- broker = new BrokerService();
- broker.addConnector("mqtt://localhost:1883");
- broker.setDataDirectory("target");
- broker.start();
- LOG.debug("MQTT broker started");
- }
-
-
- @Test
- public void testMqttTopology() throws Exception {
- MQTT client = new MQTT();
- client.setTracer(new MqttLogger());
- URI uri = URI.create("tcp://localhost:1883");
- client.setHost(uri);
-
- client.setClientId("MQTTSubscriber");
- client.setCleanSession(false);
- BlockingConnection connection = client.blockingConnection();
- connection.connect();
- Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
- byte[] qoses = connection.subscribe(topics);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) {
-
- LOG.info("topology started");
- while(!spoutActivated) {
- Thread.sleep(500);
- }
-
- // publish a retained message to the broker
- MqttOptions options = new MqttOptions();
- options.setCleanConnection(false);
- MqttPublisher publisher = new MqttPublisher(options, true);
- publisher.connectMqtt("MqttPublisher");
- publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
-
- LOG.info("published message");
-
- Message message = connection.receive();
- LOG.info("Message recieved on topic: {}", message.getTopic());
- LOG.info("Payload: {}", new String(message.getPayload()));
- message.ack();
-
- Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
- Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
- }
- }
-
- public StormTopology buildMqttTopology(){
- TopologyBuilder builder = new TopologyBuilder();
-
- MqttOptions options = new MqttOptions();
- options.setTopics(Arrays.asList(TEST_TOPIC));
- options.setCleanConnection(false);
- TestSpout spout = new TestSpout(new StringMessageMapper(), options);
-
- MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
- @Override
- public MqttMessage toMessage(ITuple tuple) {
- LOG.info("Received: {}", tuple);
- return new MqttMessage(RESULT_TOPIC, RESULT_PAYLOAD.getBytes());
- }
- });
-
- builder.setSpout("mqtt-spout", spout);
- builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout");
-
- return builder.createTopology();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml
index 7b870b2..37c19d7 100644
--- a/external/storm-mqtt/pom.xml
+++ b/external/storm-mqtt/pom.xml
@@ -14,44 +14,112 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-mqtt-parent</artifactId>
- <packaging>pom</packaging>
-
- <name>storm-mqtt-parent</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <!-- see comment below... This fixes an annoyance with intellij -->
- <provided.scope>provided</provided.scope>
- </properties>
-
- <profiles>
- <!--
- Hack to make intellij behave.
- If you use intellij, enable this profile in your IDE.
- It should make life easier.
- -->
- <profile>
- <id>intellij</id>
- <properties>
- <provided.scope>compile</provided.scope>
- </properties>
- </profile>
- </profiles>
-
- <modules>
- <module>core</module>
- </modules>
+ <artifactId>storm-mqtt</artifactId>
+ <packaging>jar</packaging>
+
+ <name>storm-mqtt</name>
+
+ <parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+
+
+
+ <repositories>
+ <repository>
+ <id>bintray</id>
+ <url>http://dl.bintray.com/andsel/maven/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>5.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>5.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <version>5.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>perTest</forkMode>
+ <enableAssertions>false</enableAssertions>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <excludedGroups>${java.unit.test.exclude}</excludedGroups>
+ <includes>
+ <include>${java.unit.test.include}</include>
+ </includes>
+ <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${maven-surefire.version}</version>
+ <configuration>
+ <forkMode>perTest</forkMode>
+ <enableAssertions>false</enableAssertions>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <includes>
+ <include>${java.integration.test.include}</include>
+ </includes>
+ <groups>${java.integration.test.group}</groups> <!--set in integration-test the profile-->
+ <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
new file mode 100644
index 0000000..3af73fd
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around SLF4J logger that allows MQTT messages to be logged.
+ */
+public class MqttLogger extends Tracer {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttLogger.class);
+
+ @Override
+ public void debug(String message, Object... args) {
+ LOG.debug(String.format(message, args));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
new file mode 100644
index 0000000..5436dda
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+/**
+ * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat")
+ * and a byte array message/payload.
+ *
+ */
+public class MqttMessage {
+ private String topic;
+ private byte[] message;
+
+
+ public MqttMessage(String topic, byte[] payload){
+ this.topic = topic;
+ this.message = payload;
+ }
+ public byte[] getMessage(){
+ return this.message;
+ }
+
+ public String getTopic(){
+ return this.topic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
new file mode 100644
index 0000000..c6173f4
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+
+/**
+ * Represents an object that can be converted to a Storm Tuple from an AckableMessage,
+ * given a MQTT Topic Name and a byte array payload.
+ */
+public interface MqttMessageMapper extends Serializable {
+ /**
+ * Convert a `MqttMessage` to a set of Values that can be emitted as a Storm Tuple.
+ *
+ * @param message An MQTT Message.
+ * @return Values representing a Storm Tuple.
+ */
+ Values toValues(MqttMessage message);
+
+ /**
+ * Returns the list of output fields this Mapper produces.
+ *
+ * @return the list of output fields this mapper produces.
+ */
+ Fields outputFields();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
new file mode 100644
index 0000000..c46c069
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * Given a Tuple, converts it to an MQTT message.
+ */
+public interface MqttTupleMapper extends Serializable{
+
+ /**
+ * Converts a Tuple to a MqttMessage
+ * @param tuple the incoming tuple
+ * @return the message to publish
+ */
+ MqttMessage toMessage(ITuple tuple);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
new file mode 100644
index 0000000..f6ca1bf
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.MqttTupleMapper;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.common.SslUtils;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+public class MqttBolt extends BaseTickTupleAwareRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class);
+ private MqttTupleMapper mapper;
+ private transient MqttPublisher publisher;
+ private boolean retain = false;
+ private transient OutputCollector collector;
+ private MqttOptions options;
+ private KeyStoreLoader keyStoreLoader;
+ private transient String topologyName;
+
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper){
+ this(options, mapper, null, false);
+ }
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain){
+ this(options, mapper, null, retain);
+ }
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader){
+ this(options, mapper, keyStoreLoader, false);
+ }
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){
+ this.options = options;
+ this.mapper = mapper;
+ this.retain = retain;
+ this.keyStoreLoader = keyStoreLoader;
+ // the following code is duplicated in the constructor of MqttPublisher
+ // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology
+ // is deployed to the cluster
+ SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+ this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
+ try {
+ this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId());
+ } catch (Exception e) {
+ LOG.error("Unable to connect to MQTT Broker.", e);
+ throw new RuntimeException("Unable to connect to MQTT Broker.", e);
+ }
+ }
+
+ @Override
+ protected void process(Tuple input) {
+ MqttMessage message = this.mapper.toMessage(input);
+ try {
+ this.publisher.publish(message);
+ this.collector.ack(input);
+ } catch (Exception e) {
+ LOG.warn("Error publishing MQTT message. Failing tuple.", e);
+ // should we fail the tuple or kill the worker?
+ collector.reportError(e);
+ collector.fail(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // this bolt does not emit tuples
+ }
+}