You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 11:18:08 UTC
[03/33] ignite git commit: IGNITE-1747 MQTT Streamer: remove
'connected' flag and add 2 tests.
IGNITE-1747 MQTT Streamer: remove 'connected' flag and add 2 tests.
MQTT Connection State can be checked by calling MqttClient#isConnected.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e487226
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e487226
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e487226
Branch: refs/heads/ignite-1093-3
Commit: 0e48722679226a0845ff74dd447f77c2da50ece4
Parents: 056490d
Author: Raul Kripalani <ra...@apache.org>
Authored: Fri Oct 23 17:51:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Fri Oct 23 17:51:46 2015 +0100
----------------------------------------------------------------------
.../apache/ignite/stream/mqtt/MqttStreamer.java | 43 +++++++----------
.../stream/mqtt/IgniteMqttStreamerTest.java | 50 +++++++++++++++++++-
2 files changed, 67 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e487226/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index a075695..e546da2 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -136,9 +136,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
/** State keeping. */
private volatile boolean stopped = true;
- /** State keeping. */
- private volatile boolean connected;
-
/** Cached log prefix for cache messages. */
private String cachedLogValues;
@@ -231,10 +228,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
stopped = false;
// Build retrier.
- Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
- .retryIfResult(new Predicate<Boolean>() {
- @Override public boolean apply(Boolean connected) {
- return !connected;
+ Retryer<Void> retrier = RetryerBuilder.<Void>newBuilder()
+ .retryIfResult(new Predicate<Void>() {
+ @Override public boolean apply(Void v) {
+ return !client.isConnected() && !stopped;
}
})
.retryIfException().retryIfRuntimeException()
@@ -288,7 +285,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
client.close();
- connected = false;
stopped = true;
}
catch (Exception e) {
@@ -307,9 +303,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
* {@inheritDoc}
*/
@Override public void connectionLost(Throwable throwable) {
- connected = false;
-
- // if we have been stopped, we do not try to establish the connection again
+ // If we have been stopped, we do not try to establish the connection again.
if (stopped)
return;
@@ -623,12 +617,13 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
/**
- * Returns whether this streamer is connected.
+ * Returns whether this streamer is connected by delegating to the underlying {@link MqttClient#isConnected()}
*
* @return {@code true} if connected; {@code false} if not.
+ * @see MqttClient#isConnected()
*/
public boolean isConnected() {
- return connected;
+ return client.isConnected();
}
/**
@@ -637,17 +632,17 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
*/
private class MqttConnectionRetrier {
/** The guava-retrying retrier object. */
- private final Retryer<Boolean> retrier;
+ private final Retryer<Void> retrier;
/** Single-threaded pool. */
- private ExecutorService exec = Executors.newSingleThreadExecutor();
+ private final ExecutorService exec = Executors.newSingleThreadExecutor();
/**
* Constructor.
*
* @param retrier The retryier object.
*/
- public MqttConnectionRetrier(Retryer<Boolean> retrier) {
+ public MqttConnectionRetrier(Retryer<Void> retrier) {
this.retrier = retrier;
}
@@ -655,14 +650,14 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
* Method called by the streamer to ask us to (re-)connect.
*/
public void connect() {
- Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
+ Callable<Void> callable = retrier.wrap(new Callable<Void>() {
+ @Override public Void call() throws Exception {
// If we're already connected, return immediately.
- if (connected)
- return true;
+ if (client.isConnected())
+ return null;
if (stopped)
- return false;
+ return null;
// Connect to broker.
if (connectOptions == null)
@@ -686,13 +681,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);
- connected = true;
-
- return true;
+ return null;
}
});
- Future<Boolean> result = exec.submit(callable);
+ Future<Void> result = exec.submit(callable);
if (blockUntilConnected) {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e487226/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 6b07fde..891866d 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -172,6 +172,54 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testConnectDisconnect() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+ streamer.setBlockUntilConnected(true);
+
+ // action time: repeat 10 times; make sure the connection state is kept correctly every time
+ for (int i = 0; i < 10; i++) {
+ streamer.start();
+
+ assertTrue(streamer.isConnected());
+
+ streamer.stop();
+
+ assertFalse(streamer.isConnected());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionStatusWithBrokerDisconnection() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+ streamer.setBlockUntilConnected(true);
+ streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+
+ streamer.start();
+
+ // action time: repeat 5 times; make sure the connection state is kept correctly every time
+ for (int i = 0; i < 5; i++) {
+ assertTrue(streamer.isConnected());
+
+ broker.stop();
+
+ assertFalse(streamer.isConnected());
+
+ broker.start(true);
+ broker.waitUntilStarted();
+
+ Thread.sleep(500);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -307,7 +355,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception If failed.
+ * @throws Exception
*/
public void testSingleTopic_NoQoS_Reconnect() throws Exception {
// configure streamer