You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/09/21 18:25:42 UTC

[32/32] ignite git commit: IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.

IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/296dd6e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/296dd6e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/296dd6e7

Branch: refs/heads/feature/ignite-535-mqtt
Commit: 296dd6e7d86fe6d0914a9fbf8062632c04e4d22c
Parents: f03f3a3
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:24:44 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:24:44 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            |   9 +-
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 156 ++++++++++++++++++-
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  80 +++++++++-
 3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index 4b0b46c..21511e8 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,7 @@
 
     <properties>
         <paho.version>1.0.2</paho.version>
-        <activemq.version>5.11.1</activemq.version>
+        <activemq.version>5.12.0</activemq.version>
         <guava-retryier.version>2.0.0</guava-retryier.version>
     </properties>
 
@@ -69,13 +69,6 @@
 
         <dependency>
             <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-kahadb-store</artifactId>
-            <version>${activemq.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-mqtt</artifactId>
             <version>${activemq.version}</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index b86d385..f18ae42 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -62,12 +62,17 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
  *     <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
  *     <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
  *         sessions, etc.</li>
- *     <li>Specifying the client ID.</li>
+ *     <li>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user
+ *         does not provide one.</li>
+ *     <li>(Re-)Connection retries based on the <i>guava-retrying</i> library. Retry wait and retry stop policies
+ *         can be configured.</li>
+ *     <li>Blocking the start() method until connected for the first time.</li>
  * </ul>
  *
- * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * Note: features like durable subscriptions, last will testament, etc. can be configured via the
  * {@link #connectOptions} property.
  *
+ * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
  * @author Raul Kripalani
  */
 public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -75,46 +80,65 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /** Logger. */
     private IgniteLogger log;
 
+    /** The MQTT client object for internal use. */
     private MqttClient client;
 
+    /** The broker URL, set by the user. */
     private String brokerUrl;
 
+    /** The topic to subscribe to, if a single topic. */
     private String topic;
 
+    /** The quality of service to use for a single topic subscription (optional). */
     private Integer qualityOfService;
 
+    /** The topics to subscribe to, if many. */
     private List<String> topics;
 
+    /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+     *  number of elements as {@link #topics}. */
     private List<Integer> qualitiesOfService;
 
-    /** Client ID in case we're using durable subscribers. */
+    /** The MQTT client ID (optional). */
     private String clientId;
 
+    /** A configurable persistence mechanism. If not set, Paho will use its default. */
     private MqttClientPersistence persistence;
 
+    /** The MQTT client connect options, where users can configured the last will and testament, durability, etc. */
     private MqttConnectOptions connectOptions;
 
-    // disconnect parameters
+    /** Quiesce timeout on disconnection. */
     private Integer disconnectQuiesceTimeout;
 
+    /** Whether to disconnect forcibly or not. */
     private boolean disconnectForcibly;
 
+    /** If disconnecting forcibly, the timeout. */
     private Integer disconnectForciblyTimeout;
 
+    /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+     *  Fibonacci-based strategy. */
     private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
 
+    /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
     private StopStrategy retryStopStrategy = StopStrategies.neverStop();
 
+    /** The internal connection retrier object with a thread pool of size 1. */
     private MqttConnectionRetrier connectionRetrier;
 
+    /** Whether to block the start() method until connected for the first time. */
+    private boolean blockUntilConnected;
+
+    /** State keeping. */
     private volatile boolean stopped = true;
 
+    /** State keeping. */
     private volatile boolean connected;
 
+    /** Cached log prefix for cache messages. */
     private String cachedLogPrefix;
 
-    private boolean blockUntilConnected;
-
     /**
      * Starts streamer.
      *
@@ -136,7 +160,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
                 "both single and multiple tuple extractor");
             A.notNullOrEmpty(brokerUrl, "broker URL");
-            A.notNullOrEmpty(clientId, "client ID");
+
+            // if the client ID is empty, generate one
+            if (clientId == null || clientId.length() == 0) {
+                clientId = MqttClient.generateClientId();
+            }
 
             // if we have both a single topic and a list of topics (but the list of topic is not of
             // size 1 and == topic, as this would be a case of re-initialization), fail
@@ -257,6 +285,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     //  MQTT Client callback methods
     // -------------------------------
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void connectionLost(Throwable throwable) {
         connected = false;
 
@@ -268,6 +299,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         connectionRetrier.connect();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
         if (getMultipleTupleExtractor() != null) {
             Map<K, V> entries = getMultipleTupleExtractor().extract(message);
@@ -285,6 +319,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void deliveryComplete(IMqttDeliveryToken token) {
         // ignore, as we don't send messages
     }
@@ -293,127 +330,229 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     //  Getters and setters
     // -------------------------------
 
+    /**
+     * @return
+     */
     public String getBrokerUrl() {
         return brokerUrl;
     }
 
+    /**
+     * @param brokerUrl The Broker URL (compulsory).
+     */
     public void setBrokerUrl(String brokerUrl) {
         this.brokerUrl = brokerUrl;
     }
 
+    /**
+     * @return
+     */
     public String getTopic() {
         return topic;
     }
 
+    /**
+     * @param topic The topic to subscribe to, if a single topic.
+     */
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    /**
+     * @return
+     */
     public Integer getQualityOfService() {
         return qualityOfService;
     }
 
+    /**
+     * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+     */
     public void setQualityOfService(Integer qualityOfService) {
         this.qualityOfService = qualityOfService;
     }
 
+    /**
+     * @return
+     */
     public List<String> getTopics() {
         return topics;
     }
 
+    /**
+     * @param topics The topics to subscribe to, if many.
+     */
     public void setTopics(List<String> topics) {
         this.topics = topics;
     }
 
+    /**
+     * @return
+     */
     public List<Integer> getQualitiesOfService() {
         return qualitiesOfService;
     }
 
+    /**
+     * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
+     * If specified, the list must contain the same number of elements as {@link #topics}.
+     */
     public void setQualitiesOfService(List<Integer> qualitiesOfService) {
         this.qualitiesOfService = qualitiesOfService;
     }
 
+    /**
+     * @return
+     */
     public String getClientId() {
         return clientId;
     }
 
+    /**
+     * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+     * it througout any reconnection attempts.
+     */
     public void setClientId(String clientId) {
         this.clientId = clientId;
     }
 
+    /**
+     * @return
+     */
     public MqttClientPersistence getPersistence() {
         return persistence;
     }
 
+    /**
+     * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+     */
     public void setPersistence(MqttClientPersistence persistence) {
         this.persistence = persistence;
     }
 
+    /**
+     * @return
+     */
     public MqttConnectOptions getConnectOptions() {
         return connectOptions;
     }
 
+    /**
+     * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+     */
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
 
+    /**
+     * @return
+     */
     public boolean isDisconnectForcibly() {
         return disconnectForcibly;
     }
 
+    /**
+     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+     */
     public void setDisconnectForcibly(boolean disconnectForcibly) {
         this.disconnectForcibly = disconnectForcibly;
     }
 
+    /**
+     * @return
+     */
     public Integer getDisconnectQuiesceTimeout() {
         return disconnectQuiesceTimeout;
     }
 
+    /**
+     * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+     */
     public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
         this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
     }
 
+    /**
+     * @return
+     */
     public Integer getDisconnectForciblyTimeout() {
         return disconnectForciblyTimeout;
     }
 
+    /**
+     * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+     */
     public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
         this.disconnectForciblyTimeout = disconnectForciblyTimeout;
     }
 
+    /**
+     * @return
+     */
     public WaitStrategy getRetryWaitStrategy() {
         return retryWaitStrategy;
     }
 
+    /**
+     * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
+     * By default, this streamer uses a Fibonacci-based strategy.
+     */
     public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
         this.retryWaitStrategy = retryWaitStrategy;
     }
 
+    /**
+     * @return
+     */
     public StopStrategy getRetryStopStrategy() {
         return retryStopStrategy;
     }
 
+    /**
+     * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+     */
     public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
         this.retryStopStrategy = retryStopStrategy;
     }
 
+    /**
+     * @return
+     */
     public boolean isBlockUntilConnected() {
         return blockUntilConnected;
     }
 
+    /**
+     * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
+     * false.
+     */
     public void setBlockUntilConnected(boolean blockUntilConnected) {
         this.blockUntilConnected = blockUntilConnected;
     }
 
+    /**
+     * A utility class to help us with (re-)connecting to the MQTT broker. It uses a single-threaded executor to perform
+     * the (re-)connections.
+     */
     private class MqttConnectionRetrier {
 
+        /** The guava-retrying retrier object. */
         private final Retryer<Boolean> retrier;
+
+        /** Single-threaded pool. */
         private ExecutorService executor = Executors.newSingleThreadExecutor();
 
+        /**
+         * Constructor.
+         * @param retrier The retryier object.
+         */
         public MqttConnectionRetrier(Retryer<Boolean> retrier) {
             this.retrier = retrier;
         }
 
+        /**
+         * Method that is called by the streamer to ask us to (re-)connect.
+         */
         public void connect() {
             Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
@@ -460,6 +599,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             }
         }
 
+        /**
+         * Stops this connection utility class by shutting down the thread pool.
+         */
         public void stop() {
             executor.shutdownNow();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 012486a..5ac7339 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -65,24 +65,41 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
  */
 public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
+    /** The test data. */
     private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+    /** Topic name for single topic tests. */
     private static final String SINGLE_TOPIC_NAME = "abc";
+
+    /** Topic names for multiple topic tests. */
     private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
 
+    /** The AMQ broker with an MQTT interface. */
     private BrokerService broker;
+
+    /** The MQTT client. */
     private MqttClient client;
+
+    /** The broker URL. */
     private String brokerUrl;
+
+    /** The broker port. **/
     private int port;
+
+    /** The MQTT streamer currently under test. */
     private MqttStreamer<Integer, String> streamer;
+
+    /** The UUID of the currently active remote listener. */
     private UUID remoteListener;
 
+    /** The Ignite data streamer. */
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
     static {
         for (int i = 0; i < 100; i++)
             TEST_DATA.put(i, "v" + i);
     }
 
-    private IgniteDataStreamer<Integer, String> dataStreamer;
-
     /** Constructor. */
     public IgniteMqttStreamerTest() {
         super(true);
@@ -99,14 +116,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
         // create the broker
         broker = new BrokerService();
-        broker.deleteAllMessages();
+        broker.setDeleteAllMessagesOnStartup(true);
         broker.setPersistent(false);
+        broker.setPersistenceAdapter(null);
+        broker.setPersistenceFactory(null);
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
         policy.setQueuePrefetch(1);
         broker.setDestinationPolicy(policyMap);
         broker.getDestinationPolicy().setDefaultEntry(policy);
+        broker.setSchedulerSupport(false);
 
         // add the MQTT transport connector to the broker
         broker.addConnector("mqtt://localhost:" + port);
@@ -143,6 +163,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -162,6 +185,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(50);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -185,6 +211,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
         streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -204,6 +233,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(50);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
         streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -227,6 +259,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -265,6 +300,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(100);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_Reconnect() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -306,6 +344,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(100);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -339,6 +380,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -363,6 +407,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -379,6 +426,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
         MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
         streamer.setIgnite(grid());
@@ -393,7 +443,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return streamer;
     }
 
-    public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+    /**
+     * @throws Exception
+     */
+    private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
         if (singleMessage) {
             final List<StringBuilder> sbs = new ArrayList<>(topics.size());
             // initialize String Builders for each topic
@@ -423,6 +476,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception
+     */
     private CountDownLatch subscribeToPutEvents(int expect) {
         Ignite ignite = grid();
 
@@ -439,14 +495,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return latch;
     }
 
+    /**
+     * @throws Exception
+     */
     private void assertCacheEntriesLoaded(int count) {
         // get the cache and check that the entries are present
         IgniteCache<Integer, String> cache = grid().cache(null);
 
         // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
-        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
             assertEquals(TEST_DATA.get(key), cache.get(key));
-        }
 
         // assert that the cache exactly the specified amount of elements
         assertEquals(count, cache.size(CachePeekMode.ALL));
@@ -455,6 +513,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
     }
 
+    /**
+     * Returns a {@link StreamSingleTupleExtractor} for testing.
+     *
+     * @throws Exception
+     */
     public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
         return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
@@ -464,6 +527,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         };
     }
 
+    /**
+     * Returns a {@link StreamMultipleTupleExtractor} for testing.
+     *
+     * @throws Exception
+     */
     public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
         return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map<Integer, String> extract(MqttMessage msg) {