You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/10/28 10:30:06 UTC
[03/46] ignite git commit: IGNITE-1747 Fix code style and Javadoc
issues in MQTT streamer.
IGNITE-1747 Fix code style and Javadoc issues in MQTT streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb0d432f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb0d432f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb0d432f
Branch: refs/heads/ignite-843-rc1
Commit: cb0d432f358615b12a65eb62f4b03b70680b3575
Parents: 43b40f4
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Oct 20 18:36:52 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Oct 20 18:37:20 2015 +0100
----------------------------------------------------------------------
.../apache/ignite/stream/mqtt/MqttStreamer.java | 252 +++++++++++++------
.../stream/mqtt/IgniteMqttStreamerTest.java | 35 ++-
.../mqtt/IgniteMqttStreamerTestSuite.java | 2 +-
3 files changed, 206 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index f18ae42..39d8d6e 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.stream.mqtt;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -70,10 +71,10 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
* </ul>
*
* Note: features like durable subscriptions, last will testament, etc. can be configured via the
- * {@link #connectOptions} property.
+ * {@link #setConnectOptions(MqttConnectOptions)} setter.
*
* @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
- * @author Raul Kripalani
+ *
*/
public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -137,7 +138,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
private volatile boolean connected;
/** Cached log prefix for cache messages. */
- private String cachedLogPrefix;
+ private String cachedLogValues;
/**
* Starts streamer.
@@ -150,9 +151,12 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// for simplicity, if these are null initialize to empty lists
topics = topics == null ? new ArrayList<String>() : topics;
+
qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
try {
+ Map<String, Object> logValues = new HashMap<>();
+
// parameter validations
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
@@ -162,9 +166,8 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
A.notNullOrEmpty(brokerUrl, "broker URL");
// if the client ID is empty, generate one
- if (clientId == null || clientId.length() == 0) {
+ if (clientId == null || clientId.length() == 0)
clientId = MqttClient.generateClientId();
- }
// if we have both a single topic and a list of topics (but the list of topic is not of
// size 1 and == topic, as this would be a case of re-initialization), fail
@@ -174,15 +177,13 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// same as above but for QoS
if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
- !qualitiesOfService.get(0).equals(qualityOfService)) {
+ !qualitiesOfService.get(0).equals(qualityOfService))
throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
- }
// Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
- if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+ if (disconnectForcibly && disconnectQuiesceTimeout != null)
A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
"with quiesce");
- }
// if we have multiple topics
if (!topics.isEmpty()) {
@@ -192,7 +193,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
"service must be either empty or have the same size as topics list");
- cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]";
+ logValues.put("topics", topics);
}
else { // just the single topic
topics.add(topic);
@@ -200,9 +201,16 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
if (qualityOfService != null)
qualitiesOfService.add(qualityOfService);
- cachedLogPrefix = "[" + topic + "]";
+ logValues.put("topic", topic);
}
+ // finish building log values
+ logValues.put("brokerUrl", brokerUrl);
+ logValues.put("clientId", clientId);
+
+ // cache log values
+ cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues) + "]";
+
// create logger
log = getIgnite().log();
@@ -232,6 +240,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// create the connection retrier
connectionRetrier = new MqttConnectionRetrier(retrier);
+
+ log.info("Starting MQTT Streamer " + cachedLogValues);
+
+ // connect
connectionRetrier.connect();
}
@@ -286,6 +298,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// -------------------------------
/**
+ * Implements the {@link MqttCallback#connectionLost(Throwable)} callback method for the MQTT client to inform the
+ * streamer that the connection has been lost.
+ *
* {@inheritDoc}
*/
@Override public void connectionLost(Throwable throwable) {
@@ -295,31 +310,41 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
if (stopped)
return;
- log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable);
+ log.warning(String.format("MQTT Connection to broker was lost [brokerUrl=%s, type=%s, err=%s]", brokerUrl,
+ throwable.getClass(), throwable.getMessage()));
+
connectionRetrier.connect();
}
/**
+ * Implements the {@link MqttCallback#messageArrived(String, MqttMessage)} to receive an MQTT message.
+ *
* {@inheritDoc}
*/
@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
if (getMultipleTupleExtractor() != null) {
Map<K, V> entries = getMultipleTupleExtractor().extract(message);
- if (log.isTraceEnabled()) {
+
+ if (log.isTraceEnabled())
log.trace("Adding cache entries: " + entries);
- }
+
getStreamer().addData(entries);
}
else {
Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
- if (log.isTraceEnabled()) {
+
+ if (log.isTraceEnabled())
log.trace("Adding cache entry: " + entry);
- }
+
getStreamer().addData(entry);
}
}
/**
+ * Empty implementation of {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)}.
+ *
+ * Not required by the streamer as it doesn't produce messages.
+ *
* {@inheritDoc}
*/
@Override public void deliveryComplete(IMqttDeliveryToken token) {
@@ -331,13 +356,8 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// -------------------------------
/**
- * @return
- */
- public String getBrokerUrl() {
- return brokerUrl;
- }
-
- /**
+ * Sets the broker URL (compulsory).
+ *
* @param brokerUrl The Broker URL (compulsory).
*/
public void setBrokerUrl(String brokerUrl) {
@@ -345,189 +365,262 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
/**
- * @return
+ * Gets the broker URL.
+ *
+ * @return The Broker URL.
*/
- public String getTopic() {
- return topic;
+ public String getBrokerUrl() {
+ return brokerUrl;
}
/**
- * @param topic The topic to subscribe to, if a single topic.
+ * Sets the topic to subscribe to, if a single topic.
+ *
+ * @param topic The topic to subscribe to.
*/
public void setTopic(String topic) {
this.topic = topic;
}
/**
- * @return
+ * Gets the subscribed topic.
+ *
+ * @return The subscribed topic.
*/
- public Integer getQualityOfService() {
- return qualityOfService;
+ public String getTopic() {
+ return topic;
}
/**
- * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+ * Sets the quality of service to use for a single topic subscription (optional).
+ *
+ * @param qualityOfService The quality of service.
*/
public void setQualityOfService(Integer qualityOfService) {
this.qualityOfService = qualityOfService;
}
/**
- * @return
+ * Gets the quality of service set by the user for a single topic consumption.
+ *
+ * @return The quality of service.
*/
- public List<String> getTopics() {
- return topics;
+ public Integer getQualityOfService() {
+ return qualityOfService;
}
/**
- * @param topics The topics to subscribe to, if many.
+ * Sets the topics to subscribe to, if many.
+ *
+ * @param topics The topics.
*/
public void setTopics(List<String> topics) {
this.topics = topics;
}
/**
- * @return
+ * Gets the topics subscribed to.
+ *
+ * @return The topics subscribed to.
*/
- public List<Integer> getQualitiesOfService() {
- return qualitiesOfService;
+ public List<String> getTopics() {
+ return topics;
}
/**
- * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
- * If specified, the list must contain the same number of elements as {@link #topics}.
+ * Sets the qualities of service to use for multiple topic subscriptions. If specified, the list must contain the
+ * same number of elements as {@link #topics}.
+ *
+ * @param qualitiesOfService The qualities of service.
*/
public void setQualitiesOfService(List<Integer> qualitiesOfService) {
this.qualitiesOfService = qualitiesOfService;
}
/**
- * @return
+ * Gets the qualities of service for multiple topics.
+ *
+ * @return The qualities of service.
*/
- public String getClientId() {
- return clientId;
+ public List<Integer> getQualitiesOfService() {
+ return qualitiesOfService;
}
/**
- * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+ * Sets the MQTT client ID (optional). If one is not provided, the streamer will generate one and will maintain
* it througout any reconnection attempts.
+ *
+ * @param clientId The client ID.
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
- * @return
+ * Gets the client ID, either the one set by the user or the automatically generated one.
+ *
+ * @return The client ID.
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ /**
+ * Gets the currently set persistence mechanism.
+ *
+ * @return The persistence mechanism.
*/
public MqttClientPersistence getPersistence() {
return persistence;
}
/**
- * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+ * Sets the persistence mechanism. If not set, Paho will use its default.
+ *
+ * @param persistence A configurable persistence mechanism.
*/
public void setPersistence(MqttClientPersistence persistence) {
this.persistence = persistence;
}
/**
- * @return
+ * Gets the currently used MQTT client connect options.
+ *
+ * @return The MQTT client connect options.
*/
public MqttConnectOptions getConnectOptions() {
return connectOptions;
}
/**
- * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+ * Sets the MQTT client connect options, where users can configured the last will and testament, durability, etc.
+ *
+ * @param connectOptions The MQTT client connect options.
*/
public void setConnectOptions(MqttConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}
/**
- * @return
+ * Sets whether to disconnect forcibly or not when shutting down. By default, it's <tt>false</tt>.
+ *
+ * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's <tt>false</tt>.
+ */
+ public void setDisconnectForcibly(boolean disconnectForcibly) {
+ this.disconnectForcibly = disconnectForcibly;
+ }
+
+ /**
+ * Gets whether this MQTT client will disconnect forcibly when shutting down.
+ *
+ * @return Whether to disconnect forcibly or not.
*/
public boolean isDisconnectForcibly() {
return disconnectForcibly;
}
/**
- * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+ * Sets the quiesce timeout on disconnection. If not provided, this streamer won't use any.
+ *
+ * @param disconnectQuiesceTimeout The disconnect quiesce timeout.
*/
- public void setDisconnectForcibly(boolean disconnectForcibly) {
- this.disconnectForcibly = disconnectForcibly;
+ public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
+ this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
}
/**
- * @return
+ * Gets the disconnect quiesce timeout.
+ *
+ * @return The disconnect quiesce timeout.
*/
public Integer getDisconnectQuiesceTimeout() {
return disconnectQuiesceTimeout;
}
/**
- * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+ * Sets the timeout if disconnecting forcibly. Compulsory in that case.
+ *
+ * @param disconnectForciblyTimeout The disconnect forcibly timeout.
*/
- public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
- this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
+ public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
+ this.disconnectForciblyTimeout = disconnectForciblyTimeout;
}
/**
- * @return
+ * Gets the timeout if disconnecting forcibly.
+ *
+ * @return Timeout.
*/
public Integer getDisconnectForciblyTimeout() {
return disconnectForciblyTimeout;
}
/**
- * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+ * Sets the strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+ * Fibonacci-based strategy.
+ *
+ * @param retryWaitStrategy The retry wait strategy.
*/
- public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
- this.disconnectForciblyTimeout = disconnectForciblyTimeout;
+ public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
+ this.retryWaitStrategy = retryWaitStrategy;
}
/**
- * @return
+ * Gets the retry wait strategy.
+ *
+ * @return The retry wait strategy.
*/
public WaitStrategy getRetryWaitStrategy() {
return retryWaitStrategy;
}
/**
- * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
- * By default, this streamer uses a Fibonacci-based strategy.
+ * Sets the strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+ *
+ * @param retryStopStrategy The retry stop strategy.
*/
- public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
- this.retryWaitStrategy = retryWaitStrategy;
+ public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
+ this.retryStopStrategy = retryStopStrategy;
}
/**
- * @return
+ * Gets the retry stop strategy.
+ *
+ * @return The retry stop strategy.
*/
public StopStrategy getRetryStopStrategy() {
return retryStopStrategy;
}
/**
- * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+ * Sets whether to block the start() method until connected for the first time. By default, it's <tt>false</tt>.
+ *
+ * @param blockUntilConnected Whether to block or not.
*/
- public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
- this.retryStopStrategy = retryStopStrategy;
+ public void setBlockUntilConnected(boolean blockUntilConnected) {
+ this.blockUntilConnected = blockUntilConnected;
}
- /**
- * @return
- */
public boolean isBlockUntilConnected() {
return blockUntilConnected;
}
/**
- * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
- * false.
+ * Returns whether this streamer is stopped.
+ *
+ * @return <tt>true</tt> if stopped; <tt>false</tt> if not.
*/
- public void setBlockUntilConnected(boolean blockUntilConnected) {
- this.blockUntilConnected = blockUntilConnected;
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * Returns whether this streamer is connected.
+ *
+ * @return <tt>true</tt> if connected; <tt>false</tt> if not.
+ */
+ public boolean isConnected() {
+ return connected;
}
/**
@@ -551,7 +644,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
/**
- * Method that is called by the streamer to ask us to (re-)connect.
+ * Method called by the streamer to ask us to (re-)connect.
*/
public void connect() {
Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
@@ -576,12 +669,15 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
else {
int[] qoses = new int[qualitiesOfService.size()];
+
for (int i = 0; i < qualitiesOfService.size(); i++)
qoses[i] = qualitiesOfService.get(i);
client.subscribe(topics.toArray(new String[0]), qoses);
}
+ log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);
+
connected = true;
return connected;
}
@@ -608,4 +704,4 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 5ac7339..76404b8 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -105,6 +105,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
super(true);
}
+ /**
+ *
+ * @throws Exception
+ */
@Before @SuppressWarnings("unchecked")
public void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
@@ -123,7 +127,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
+
policy.setQueuePrefetch(1);
+
broker.setDestinationPolicy(policyMap);
broker.getDestinationPolicy().setDefaultEntry(policy);
broker.setSchedulerSupport(false);
@@ -138,13 +144,19 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
// create the client and connect
client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+
client.connect();
// create mqtt streamer
dataStreamer = grid().dataStreamer(null);
+
streamer = createMqttStreamer(dataStreamer);
}
+ /**
+ *
+ * @throws Exception
+ */
@After
public void afterTest() throws Exception {
try {
@@ -160,7 +172,6 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
broker.stop();
broker.deleteAllMessages();
-
}
/**
@@ -327,7 +338,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
broker.stop();
broker.start(true);
broker.waitUntilStarted();
+
Thread.sleep(2000);
+
client.connect();
// let's ensure we have 2 connections: Ignite and our test
@@ -370,14 +383,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
// now shutdown the broker, wait 2 seconds and start it again
broker.stop();
broker.start(true);
+
broker.waitUntilStarted();
+
client.connect();
// lets send messages and ensure they are not received, because our retrier desisted
sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
Thread.sleep(3000);
- assertNull(grid().cache(null).get(50));
+ assertNull(grid().cache(null).get(50));
}
/**
@@ -422,8 +438,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
catch (Exception e) {
return;
}
- fail("Expected an exception reporting invalid parameters");
+ fail("Expected an exception reporting invalid parameters");
}
/**
@@ -449,28 +465,34 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
if (singleMessage) {
final List<StringBuilder> sbs = new ArrayList<>(topics.size());
+
// initialize String Builders for each topic
F.forEach(topics, new IgniteInClosure<String>() {
@Override public void apply(String s) {
sbs.add(new StringBuilder());
}
});
+
// fill String Builders for each topic
F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() {
@Override public void apply(Integer integer) {
sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
}
});
+
// send each buffer out
for (int i = 0; i < topics.size(); i++) {
MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes());
+
client.publish(topics.get(i % topics.size()), msg);
}
}
else {
for (int i = fromIdx; i < fromIdx + count; i++) {
byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
+
MqttMessage msg = new MqttMessage(payload);
+
client.publish(topics.get(i % topics.size()), msg);
}
}
@@ -487,6 +509,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
@SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
@Override public boolean apply(UUID uuid, CacheEvent evt) {
latch.countDown();
+
return true;
}
};
@@ -522,6 +545,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
+
return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
}
};
@@ -539,15 +563,18 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
.omitEmptyStrings()
.withKeyValueSeparator(",")
.split(new String(msg.getPayload()));
+
final Map<Integer, String> answer = new HashMap<>();
+
F.forEach(map.keySet(), new IgniteInClosure<String>() {
@Override public void apply(String s) {
answer.put(Integer.parseInt(s), map.get(s));
}
});
+
return answer;
}
};
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
index ff25145..413eaab 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -31,4 +31,4 @@ import org.junit.runners.Suite;
})
public class IgniteMqttStreamerTestSuite {
-}
\ No newline at end of file
+}