You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 11:18:07 UTC

[02/33] ignite git commit: review

review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056490d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056490d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056490d2

Branch: refs/heads/ignite-1093-3
Commit: 056490d239c46a3950234ce7ea9afc4b203a00c3
Parents: cb0d432
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Oct 22 13:52:43 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Oct 22 13:52:43 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 125 ++++++++++---------
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  71 +++++------
 .../mqtt/IgniteMqttStreamerTestSuite.java       |   2 -
 3 files changed, 102 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/056490d2/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index 39d8d6e..a075695 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -74,10 +74,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
  * {@link #setConnectOptions(MqttConnectOptions)} setter.
  *
  * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
- *
  */
 public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
-
     /** Logger. */
     private IgniteLogger log;
 
@@ -96,8 +94,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /** The topics to subscribe to, if many. */
     private List<String> topics;
 
-    /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
-     *  number of elements as {@link #topics}. */
+    /**
+     * The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+     * number of elements as {@link #topics}.
+     */
     private List<Integer> qualitiesOfService;
 
     /** The MQTT client ID (optional). */
@@ -118,8 +118,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /** If disconnecting forcibly, the timeout. */
     private Integer disconnectForciblyTimeout;
 
-    /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
-     *  Fibonacci-based strategy. */
+    /**
+     * The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+     * Fibonacci-based strategy.
+     */
     private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
 
     /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
@@ -149,7 +151,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         if (!stopped)
             throw new IgniteException("Attempted to start an already started MQTT Streamer");
 
-        // for simplicity, if these are null initialize to empty lists
+        // For simplicity, if these are null initialize to empty lists.
         topics = topics == null ? new ArrayList<String>() : topics;
 
         qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
@@ -157,45 +159,47 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         try {
             Map<String, Object> logValues = new HashMap<>();
 
-            // parameter validations
+            // Parameter validations.
             A.notNull(getStreamer(), "streamer");
             A.notNull(getIgnite(), "ignite");
-            A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing");
-            A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
-                "both single and multiple tuple extractor");
+            A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
+                "tuple extractor missing");
+            A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null,
+                "cannot provide both single and multiple tuple extractor");
             A.notNullOrEmpty(brokerUrl, "broker URL");
 
-            // if the client ID is empty, generate one
+            // If the client ID is empty, generate one.
             if (clientId == null || clientId.length() == 0)
                 clientId = MqttClient.generateClientId();
 
-            // if we have both a single topic and a list of topics (but the list of topic is not of
-            // size 1 and == topic, as this would be a case of re-initialization), fail
+            // If we have both a single topic and a list of topics (but the list of topic is not of
+            // size 1 and == topic, as this would be a case of re-initialization), fail.
             if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
                 topics.size() != 1 && !topics.get(0).equals(topic))
-                throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
+                throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time.");
 
-            // same as above but for QoS
+            // Same as above but for QoS.
             if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
                 !qualitiesOfService.get(0).equals(qualityOfService))
-                throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
+                throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time.");
 
-            // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
+            // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly.
             if (disconnectForcibly && disconnectQuiesceTimeout != null)
                 A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
                     "with quiesce");
 
-            // if we have multiple topics
+            // If we have multiple topics.
             if (!topics.isEmpty()) {
                 for (String t : topics)
                     A.notNullOrEmpty(t, "topic in list of topics");
 
-                A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
-                    "service must be either empty or have the same size as topics list");
+                A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(),
+                    "qualities of service must be either empty or have the same size as topics list");
 
                 logValues.put("topics", topics);
             }
-            else {  // just the single topic
+            else {
+                // Just the single topic.
                 topics.add(topic);
 
                 if (qualityOfService != null)
@@ -204,29 +208,29 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
                 logValues.put("topic", topic);
             }
 
-            // finish building log values
+            // Finish building log values.
             logValues.put("brokerUrl", brokerUrl);
             logValues.put("clientId", clientId);
 
-            // cache log values
+            // Cache log values.
             cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues) + "]";
 
-            // create logger
+            // Create logger.
             log = getIgnite().log();
 
-            // create the mqtt client
+            // Create the MQTT client.
             if (persistence == null)
                 client = new MqttClient(brokerUrl, clientId);
             else
                 client = new MqttClient(brokerUrl, clientId, persistence);
 
-            // set this as a callback
+            // Set this as a callback.
             client.setCallback(this);
 
-            // set stopped to false, as the connection will start async
+            // Set stopped to false, as the connection will start async.
             stopped = false;
 
-            // build retrier
+            // Build retrier.
             Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
                 .retryIfResult(new Predicate<Boolean>() {
                     @Override public boolean apply(Boolean connected) {
@@ -238,29 +242,29 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
                 .withStopStrategy(retryStopStrategy)
                 .build();
 
-            // create the connection retrier
+            // Create the connection retrier.
             connectionRetrier = new MqttConnectionRetrier(retrier);
 
             log.info("Starting MQTT Streamer " + cachedLogValues);
 
-            // connect
+            // Connect.
             connectionRetrier.connect();
-
         }
-        catch (Throwable t) {
-            throw new IgniteException("Exception while initializing MqttStreamer", t);
+        catch (Exception e) {
+            throw new IgniteException("Failed to initialize MQTT Streamer.", e);
         }
-
     }
 
     /**
      * Stops streamer.
+     *
+     * @throws IgniteException If failed.
      */
     public void stop() throws IgniteException {
         if (stopped)
-            throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+            throw new IgniteException("Failed to stop MQTT Streamer (already stopped).");
 
-        // stop the retrier
+        // Stop the retrier.
         connectionRetrier.stop();
 
         try {
@@ -273,23 +277,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
                 else
                     client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
-
-            } else {
+            }
+            else {
                 if (disconnectQuiesceTimeout == null)
                     client.disconnect();
 
                 else
                     client.disconnect(disconnectQuiesceTimeout);
-
             }
 
             client.close();
+
             connected = false;
             stopped = true;
-
         }
-        catch (Throwable t) {
-            throw new IgniteException("Exception while stopping MqttStreamer", t);
+        catch (Exception e) {
+            throw new IgniteException("Failed to stop Exception while stopping MQTT Streamer.", e);
         }
     }
 
@@ -502,9 +505,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     }
 
     /**
-     * Sets whether to disconnect forcibly or not when shutting down. By default, it's <tt>false</tt>.
+     * Sets whether to disconnect forcibly or not when shutting down. By default, it's {@code false}.
      *
-     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's <tt>false</tt>.
+     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's {@code false}.
      */
     public void setDisconnectForcibly(boolean disconnectForcibly) {
         this.disconnectForcibly = disconnectForcibly;
@@ -593,7 +596,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     }
 
     /**
-     * Sets whether to block the start() method until connected for the first time. By default, it's <tt>false</tt>.
+     * Sets whether to block the start() method until connected for the first time. By default, it's {@code false}.
      *
      * @param blockUntilConnected Whether to block or not.
      */
@@ -601,6 +604,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         this.blockUntilConnected = blockUntilConnected;
     }
 
+    /**
+     * Gets whether to block the start() method until connected for the first time. By default, it's {@code false}.
+     *
+     * @return {@code true} if should connect synchronously in start.
+     */
     public boolean isBlockUntilConnected() {
         return blockUntilConnected;
     }
@@ -608,7 +616,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /**
      * Returns whether this streamer is stopped.
      *
-     * @return <tt>true</tt> if stopped; <tt>false</tt> if not.
+     * @return {@code true} if stopped; {@code false} if not.
      */
     public boolean isStopped() {
         return stopped;
@@ -617,7 +625,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /**
      * Returns whether this streamer is connected.
      *
-     * @return <tt>true</tt> if connected; <tt>false</tt> if not.
+     * @return {@code true} if connected; {@code false} if not.
      */
     public boolean isConnected() {
         return connected;
@@ -628,15 +636,15 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
      * the (re-)connections.
      */
     private class MqttConnectionRetrier {
-
         /** The guava-retrying retrier object. */
         private final Retryer<Boolean> retrier;
 
         /** Single-threaded pool. */
-        private ExecutorService executor = Executors.newSingleThreadExecutor();
+        private ExecutorService exec = Executors.newSingleThreadExecutor();
 
         /**
          * Constructor.
+         *
          * @param retrier The retryier object.
          */
         public MqttConnectionRetrier(Retryer<Boolean> retrier) {
@@ -649,21 +657,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         public void connect() {
             Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
-                    // if we're already connected, return immediately
+                    // If we're already connected, return immediately.
                     if (connected)
                         return true;
 
                     if (stopped)
                         return false;
 
-                    // connect to broker
+                    // Connect to broker.
                     if (connectOptions == null)
                         client.connect();
                     else
                         client.connect(connectOptions);
 
-                    // always use the multiple topics variant of the mqtt client; even if the user specified a single
-                    // topic and/or QoS, the initialization code would have placed it inside the 1..n structures
+                    // Always use the multiple topics variant of the mqtt client; even if the user specified a single
+                    // topic and/or QoS, the initialization code would have placed it inside the 1..n structures.
                     if (qualitiesOfService.isEmpty())
                         client.subscribe(topics.toArray(new String[0]));
 
@@ -679,11 +687,12 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
                     log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);
 
                     connected = true;
-                    return connected;
+
+                    return true;
                 }
             });
 
-            Future<Boolean> result = executor.submit(callable);
+            Future<Boolean> result = exec.submit(callable);
 
             if (blockUntilConnected) {
                 try {
@@ -699,9 +708,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
          * Stops this connection utility class by shutting down the thread pool.
          */
         public void stop() {
-            executor.shutdownNow();
+            exec.shutdownNow();
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/056490d2/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 76404b8..6b07fde 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -60,11 +60,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
 
 /**
  * Test for {@link MqttStreamer}.
- *
- * @author Raul Kripalani
  */
 public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
-
     /** The test data. */
     private static final Map<Integer, String> TEST_DATA = new HashMap<>();
 
@@ -106,8 +103,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     @Before @SuppressWarnings("unchecked")
     public void beforeTest() throws Exception {
@@ -154,8 +150,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     @After
     public void afterTest() throws Exception {
@@ -175,7 +170,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
@@ -197,7 +192,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
@@ -223,7 +218,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
@@ -245,7 +240,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
@@ -271,7 +266,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
         // configure streamer
@@ -312,7 +307,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSingleTopic_NoQoS_Reconnect() throws Exception {
         // configure streamer
@@ -358,7 +353,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
         // configure streamer
@@ -397,7 +392,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
@@ -424,7 +419,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
         // configure streamer
@@ -443,7 +438,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @param dataStreamer Streamer.
+     * @return MQTT streamer.
      */
     private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
         MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
@@ -460,10 +456,15 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @param topics Topics.
+     * @param fromIdx From index.
+     * @param cnt Count.
+     * @param singleMsg Single message flag.
+     * @throws MqttException If failed.
      */
-    private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
-        if (singleMessage) {
+    private void sendMessages(final List<String> topics, int fromIdx, int cnt, boolean singleMsg)
+        throws MqttException {
+        if (singleMsg) {
             final List<StringBuilder> sbs = new ArrayList<>(topics.size());
 
             // initialize String Builders for each topic
@@ -474,7 +475,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
             });
 
             // fill String Builders for each topic
-            F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() {
+            F.forEach(F.range(fromIdx, fromIdx + cnt), new IgniteInClosure<Integer>() {
                 @Override public void apply(Integer integer) {
                     sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
                 }
@@ -488,7 +489,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
             }
         }
         else {
-            for (int i = fromIdx; i < fromIdx + count; i++) {
+            for (int i = fromIdx; i < fromIdx + cnt; i++) {
                 byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
 
                 MqttMessage msg = new MqttMessage(payload);
@@ -499,14 +500,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @param expect Expected count.
+     * @return Latch to be counted down in listener.
      */
     private CountDownLatch subscribeToPutEvents(int expect) {
         Ignite ignite = grid();
 
         // Listen to cache PUT events and expect as many as messages as test data items
         final CountDownLatch latch = new CountDownLatch(expect);
-        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+
+        IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
             @Override public boolean apply(UUID uuid, CacheEvent evt) {
                 latch.countDown();
 
@@ -514,32 +517,32 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
             }
         };
 
-        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null))
+            .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+
         return latch;
     }
 
     /**
-     * @throws Exception
+     * @param cnt Count.
      */
-    private void assertCacheEntriesLoaded(int count) {
+    private void assertCacheEntriesLoaded(int cnt) {
         // get the cache and check that the entries are present
         IgniteCache<Integer, String> cache = grid().cache(null);
 
         // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
-        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt))
             assertEquals(TEST_DATA.get(key), cache.get(key));
 
         // assert that the cache exactly the specified amount of elements
-        assertEquals(count, cache.size(CachePeekMode.ALL));
+        assertEquals(cnt, cache.size(CachePeekMode.ALL));
 
         // remove the event listener
         grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
     }
 
     /**
-     * Returns a {@link StreamSingleTupleExtractor} for testing.
-     *
-     * @throws Exception
+     * @return {@link StreamSingleTupleExtractor} for testing.
      */
     public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
         return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@@ -552,9 +555,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Returns a {@link StreamMultipleTupleExtractor} for testing.
-     *
-     * @throws Exception
+     * @return {@link StreamMultipleTupleExtractor} for testing.
      */
     public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
         return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/056490d2/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
index 413eaab..ed0c2f7 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -22,8 +22,6 @@ import org.junit.runners.Suite;
 
 /**
  * MQTT streamer tests.
- *
- * @author Raul Kripalani
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({