You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 16:59:48 UTC
[08/23] ignite git commit: IGNITE-535 Finish MQTT Streamer docs and
tests. Upgrade latter to AMQ 5.12.0.
IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/296dd6e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/296dd6e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/296dd6e7
Branch: refs/heads/ignite-1513-final
Commit: 296dd6e7d86fe6d0914a9fbf8062632c04e4d22c
Parents: f03f3a3
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:24:44 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:24:44 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 9 +-
.../apache/ignite/stream/mqtt/MqttStreamer.java | 156 ++++++++++++++++++-
.../stream/mqtt/IgniteMqttStreamerTest.java | 80 +++++++++-
3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index 4b0b46c..21511e8 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,7 @@
<properties>
<paho.version>1.0.2</paho.version>
- <activemq.version>5.11.1</activemq.version>
+ <activemq.version>5.12.0</activemq.version>
<guava-retryier.version>2.0.0</guava-retryier.version>
</properties>
@@ -69,13 +69,6 @@
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kahadb-store</artifactId>
- <version>${activemq.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index b86d385..f18ae42 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -62,12 +62,17 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
* <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
* <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
* sessions, etc.</li>
- * <li>Specifying the client ID.</li>
+ * <li>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user
+ * does not provide one.</li>
+ * <li>(Re-)Connection retries based on the <i>guava-retrying</i> library. Retry wait and retry stop policies
+ * can be configured.</li>
+ * <li>Blocking the start() method until connected for the first time.</li>
* </ul>
*
- * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * Note: features like durable subscriptions, last will testament, etc. can be configured via the
* {@link #connectOptions} property.
*
+ * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
* @author Raul Kripalani
*/
public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -75,46 +80,65 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
/** Logger. */
private IgniteLogger log;
+ /** The MQTT client object for internal use. */
private MqttClient client;
+ /** The broker URL, set by the user. */
private String brokerUrl;
+ /** The topic to subscribe to, if a single topic. */
private String topic;
+ /** The quality of service to use for a single topic subscription (optional). */
private Integer qualityOfService;
+ /** The topics to subscribe to, if many. */
private List<String> topics;
+ /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+ * number of elements as {@link #topics}. */
private List<Integer> qualitiesOfService;
- /** Client ID in case we're using durable subscribers. */
+ /** The MQTT client ID (optional). */
private String clientId;
+ /** A configurable persistence mechanism. If not set, Paho will use its default. */
private MqttClientPersistence persistence;
+ /** The MQTT client connect options, where users can configured the last will and testament, durability, etc. */
private MqttConnectOptions connectOptions;
- // disconnect parameters
+ /** Quiesce timeout on disconnection. */
private Integer disconnectQuiesceTimeout;
+ /** Whether to disconnect forcibly or not. */
private boolean disconnectForcibly;
+ /** If disconnecting forcibly, the timeout. */
private Integer disconnectForciblyTimeout;
+ /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+ * Fibonacci-based strategy. */
private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+ /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+ /** The internal connection retrier object with a thread pool of size 1. */
private MqttConnectionRetrier connectionRetrier;
+ /** Whether to block the start() method until connected for the first time. */
+ private boolean blockUntilConnected;
+
+ /** State keeping. */
private volatile boolean stopped = true;
+ /** State keeping. */
private volatile boolean connected;
+ /** Cached log prefix for cache messages. */
private String cachedLogPrefix;
- private boolean blockUntilConnected;
-
/**
* Starts streamer.
*
@@ -136,7 +160,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
"both single and multiple tuple extractor");
A.notNullOrEmpty(brokerUrl, "broker URL");
- A.notNullOrEmpty(clientId, "client ID");
+
+ // if the client ID is empty, generate one
+ if (clientId == null || clientId.length() == 0) {
+ clientId = MqttClient.generateClientId();
+ }
// if we have both a single topic and a list of topics (but the list of topic is not of
// size 1 and == topic, as this would be a case of re-initialization), fail
@@ -257,6 +285,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// MQTT Client callback methods
// -------------------------------
+ /**
+ * {@inheritDoc}
+ */
@Override public void connectionLost(Throwable throwable) {
connected = false;
@@ -268,6 +299,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
connectionRetrier.connect();
}
+ /**
+ * {@inheritDoc}
+ */
@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
if (getMultipleTupleExtractor() != null) {
Map<K, V> entries = getMultipleTupleExtractor().extract(message);
@@ -285,6 +319,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override public void deliveryComplete(IMqttDeliveryToken token) {
// ignore, as we don't send messages
}
@@ -293,127 +330,229 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// Getters and setters
// -------------------------------
+ /**
+ * @return
+ */
public String getBrokerUrl() {
return brokerUrl;
}
+ /**
+ * @param brokerUrl The Broker URL (compulsory).
+ */
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
+ /**
+ * @return
+ */
public String getTopic() {
return topic;
}
+ /**
+ * @param topic The topic to subscribe to, if a single topic.
+ */
public void setTopic(String topic) {
this.topic = topic;
}
+ /**
+ * @return
+ */
public Integer getQualityOfService() {
return qualityOfService;
}
+ /**
+ * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+ */
public void setQualityOfService(Integer qualityOfService) {
this.qualityOfService = qualityOfService;
}
+ /**
+ * @return
+ */
public List<String> getTopics() {
return topics;
}
+ /**
+ * @param topics The topics to subscribe to, if many.
+ */
public void setTopics(List<String> topics) {
this.topics = topics;
}
+ /**
+ * @return
+ */
public List<Integer> getQualitiesOfService() {
return qualitiesOfService;
}
+ /**
+ * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
+ * If specified, the list must contain the same number of elements as {@link #topics}.
+ */
public void setQualitiesOfService(List<Integer> qualitiesOfService) {
this.qualitiesOfService = qualitiesOfService;
}
+ /**
+ * @return
+ */
public String getClientId() {
return clientId;
}
+ /**
+ * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+ * it througout any reconnection attempts.
+ */
public void setClientId(String clientId) {
this.clientId = clientId;
}
+ /**
+ * @return
+ */
public MqttClientPersistence getPersistence() {
return persistence;
}
+ /**
+ * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+ */
public void setPersistence(MqttClientPersistence persistence) {
this.persistence = persistence;
}
+ /**
+ * @return
+ */
public MqttConnectOptions getConnectOptions() {
return connectOptions;
}
+ /**
+ * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+ */
public void setConnectOptions(MqttConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}
+ /**
+ * @return
+ */
public boolean isDisconnectForcibly() {
return disconnectForcibly;
}
+ /**
+ * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+ */
public void setDisconnectForcibly(boolean disconnectForcibly) {
this.disconnectForcibly = disconnectForcibly;
}
+ /**
+ * @return
+ */
public Integer getDisconnectQuiesceTimeout() {
return disconnectQuiesceTimeout;
}
+ /**
+ * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+ */
public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
}
+ /**
+ * @return
+ */
public Integer getDisconnectForciblyTimeout() {
return disconnectForciblyTimeout;
}
+ /**
+ * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+ */
public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
this.disconnectForciblyTimeout = disconnectForciblyTimeout;
}
+ /**
+ * @return
+ */
public WaitStrategy getRetryWaitStrategy() {
return retryWaitStrategy;
}
+ /**
+ * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
+ * By default, this streamer uses a Fibonacci-based strategy.
+ */
public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
this.retryWaitStrategy = retryWaitStrategy;
}
+ /**
+ * @return
+ */
public StopStrategy getRetryStopStrategy() {
return retryStopStrategy;
}
+ /**
+ * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+ */
public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
this.retryStopStrategy = retryStopStrategy;
}
+ /**
+ * @return
+ */
public boolean isBlockUntilConnected() {
return blockUntilConnected;
}
+ /**
+ * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
+ * false.
+ */
public void setBlockUntilConnected(boolean blockUntilConnected) {
this.blockUntilConnected = blockUntilConnected;
}
+ /**
+ * A utility class to help us with (re-)connecting to the MQTT broker. It uses a single-threaded executor to perform
+ * the (re-)connections.
+ */
private class MqttConnectionRetrier {
+ /** The guava-retrying retrier object. */
private final Retryer<Boolean> retrier;
+
+ /** Single-threaded pool. */
private ExecutorService executor = Executors.newSingleThreadExecutor();
+ /**
+ * Constructor.
+ * @param retrier The retryier object.
+ */
public MqttConnectionRetrier(Retryer<Boolean> retrier) {
this.retrier = retrier;
}
+ /**
+ * Method that is called by the streamer to ask us to (re-)connect.
+ */
public void connect() {
Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -460,6 +599,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
}
+ /**
+ * Stops this connection utility class by shutting down the thread pool.
+ */
public void stop() {
executor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 012486a..5ac7339 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -65,24 +65,41 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
*/
public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+ /** The test data. */
private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+ /** Topic name for single topic tests. */
private static final String SINGLE_TOPIC_NAME = "abc";
+
+ /** Topic names for multiple topic tests. */
private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+ /** The AMQ broker with an MQTT interface. */
private BrokerService broker;
+
+ /** The MQTT client. */
private MqttClient client;
+
+ /** The broker URL. */
private String brokerUrl;
+
+ /** The broker port. **/
private int port;
+
+ /** The MQTT streamer currently under test. */
private MqttStreamer<Integer, String> streamer;
+
+ /** The UUID of the currently active remote listener. */
private UUID remoteListener;
+ /** The Ignite data streamer. */
+ private IgniteDataStreamer<Integer, String> dataStreamer;
+
static {
for (int i = 0; i < 100; i++)
TEST_DATA.put(i, "v" + i);
}
- private IgniteDataStreamer<Integer, String> dataStreamer;
-
/** Constructor. */
public IgniteMqttStreamerTest() {
super(true);
@@ -99,14 +116,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
// create the broker
broker = new BrokerService();
- broker.deleteAllMessages();
+ broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
+ broker.setPersistenceAdapter(null);
+ broker.setPersistenceFactory(null);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setQueuePrefetch(1);
broker.setDestinationPolicy(policyMap);
broker.getDestinationPolicy().setDefaultEntry(policy);
+ broker.setSchedulerSupport(false);
// add the MQTT transport connector to the broker
broker.addConnector("mqtt://localhost:" + port);
@@ -143,6 +163,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -162,6 +185,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(50);
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -185,6 +211,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
// configure streamer
streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -204,6 +233,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(50);
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
// configure streamer
streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -227,6 +259,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -265,6 +300,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(100);
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_Reconnect() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -306,6 +344,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(100);
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -339,6 +380,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -363,6 +407,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -379,6 +426,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(grid());
@@ -393,7 +443,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return streamer;
}
- public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+ /**
+ * @throws Exception
+ */
+ private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
if (singleMessage) {
final List<StringBuilder> sbs = new ArrayList<>(topics.size());
// initialize String Builders for each topic
@@ -423,6 +476,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
}
+ /**
+ * @throws Exception
+ */
private CountDownLatch subscribeToPutEvents(int expect) {
Ignite ignite = grid();
@@ -439,14 +495,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return latch;
}
+ /**
+ * @throws Exception
+ */
private void assertCacheEntriesLoaded(int count) {
// get the cache and check that the entries are present
IgniteCache<Integer, String> cache = grid().cache(null);
// for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
- for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+ for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
assertEquals(TEST_DATA.get(key), cache.get(key));
- }
// assert that the cache exactly the specified amount of elements
assertEquals(count, cache.size(CachePeekMode.ALL));
@@ -455,6 +513,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
}
+ /**
+ * Returns a {@link StreamSingleTupleExtractor} for testing.
+ *
+ * @throws Exception
+ */
public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
@@ -464,6 +527,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
};
}
+ /**
+ * Returns a {@link StreamMultipleTupleExtractor} for testing.
+ *
+ * @throws Exception
+ */
public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map<Integer, String> extract(MqttMessage msg) {