You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 11:18:08 UTC

[03/33] ignite git commit: IGNITE-1747 MQTT Streamer: remove 'connected' flag and add 2 tests.

IGNITE-1747 MQTT Streamer: remove 'connected' flag and add 2 tests.

MQTT Connection State can be checked by calling MqttClient#isConnected.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e487226
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e487226
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e487226

Branch: refs/heads/ignite-1093-3
Commit: 0e48722679226a0845ff74dd447f77c2da50ece4
Parents: 056490d
Author: Raul Kripalani <ra...@apache.org>
Authored: Fri Oct 23 17:51:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Fri Oct 23 17:51:46 2015 +0100

----------------------------------------------------------------------
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 43 +++++++----------
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 50 +++++++++++++++++++-
 2 files changed, 67 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0e487226/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index a075695..e546da2 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -136,9 +136,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /** State keeping. */
     private volatile boolean stopped = true;
 
-    /** State keeping. */
-    private volatile boolean connected;
-
     /** Cached log prefix for cache messages. */
     private String cachedLogValues;
 
@@ -231,10 +228,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             stopped = false;
 
             // Build retrier.
-            Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
-                .retryIfResult(new Predicate<Boolean>() {
-                    @Override public boolean apply(Boolean connected) {
-                        return !connected;
+            Retryer<Void> retrier = RetryerBuilder.<Void>newBuilder()
+                .retryIfResult(new Predicate<Void>() {
+                    @Override public boolean apply(Void v) {
+                        return !client.isConnected() && !stopped;
                     }
                 })
                 .retryIfException().retryIfRuntimeException()
@@ -288,7 +285,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
             client.close();
 
-            connected = false;
             stopped = true;
         }
         catch (Exception e) {
@@ -307,9 +303,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
      * {@inheritDoc}
      */
     @Override public void connectionLost(Throwable throwable) {
-        connected = false;
-
-        // if we have been stopped, we do not try to establish the connection again
+        // If we have been stopped, we do not try to establish the connection again.
         if (stopped)
             return;
 
@@ -623,12 +617,13 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     }
 
     /**
-     * Returns whether this streamer is connected.
+     * Returns whether this streamer is connected by delegating to the underlying {@link MqttClient#isConnected()}
      *
      * @return {@code true} if connected; {@code false} if not.
+     * @see MqttClient#isConnected()
      */
     public boolean isConnected() {
-        return connected;
+        return client.isConnected();
     }
 
     /**
@@ -637,17 +632,17 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
      */
     private class MqttConnectionRetrier {
         /** The guava-retrying retrier object. */
-        private final Retryer<Boolean> retrier;
+        private final Retryer<Void> retrier;
 
         /** Single-threaded pool. */
-        private ExecutorService exec = Executors.newSingleThreadExecutor();
+        private final ExecutorService exec = Executors.newSingleThreadExecutor();
 
         /**
          * Constructor.
          *
          * @param retrier The retryier object.
          */
-        public MqttConnectionRetrier(Retryer<Boolean> retrier) {
+        public MqttConnectionRetrier(Retryer<Void> retrier) {
             this.retrier = retrier;
         }
 
@@ -655,14 +650,14 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
          * Method called by the streamer to ask us to (re-)connect.
          */
         public void connect() {
-            Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
+            Callable<Void> callable = retrier.wrap(new Callable<Void>() {
+                @Override public Void call() throws Exception {
                     // If we're already connected, return immediately.
-                    if (connected)
-                        return true;
+                    if (client.isConnected())
+                        return null;
 
                     if (stopped)
-                        return false;
+                        return null;
 
                     // Connect to broker.
                     if (connectOptions == null)
@@ -686,13 +681,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
                     log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);
 
-                    connected = true;
-
-                    return true;
+                    return null;
                 }
             });
 
-            Future<Boolean> result = exec.submit(callable);
+            Future<Void> result = exec.submit(callable);
 
             if (blockUntilConnected) {
                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e487226/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 6b07fde..891866d 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -172,6 +172,54 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testConnectDisconnect() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+        streamer.setBlockUntilConnected(true);
+
+        // action time: repeat 10 times; make sure the connection state is kept correctly every time
+        for (int i = 0; i < 10; i++) {
+            streamer.start();
+
+            assertTrue(streamer.isConnected());
+
+            streamer.stop();
+
+            assertFalse(streamer.isConnected());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionStatusWithBrokerDisconnection() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+        streamer.setBlockUntilConnected(true);
+        streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+
+        streamer.start();
+
+        // action time: repeat 5 times; make sure the connection state is kept correctly every time
+        for (int i = 0; i < 5; i++) {
+            assertTrue(streamer.isConnected());
+
+            broker.stop();
+
+            assertFalse(streamer.isConnected());
+
+            broker.start(true);
+            broker.waitUntilStarted();
+
+            Thread.sleep(500);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -307,7 +355,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception If failed.
+     * @throws Exception
      */
     public void testSingleTopic_NoQoS_Reconnect() throws Exception {
         // configure streamer