You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:33 UTC

[03/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-rabbitmq

[FLINK-6711] Activate strict checkstyle for flink-connector-rabbitmq


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4f73391
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4f73391
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4f73391

Branch: refs/heads/master
Commit: d4f73391708bdfe466a5c3c771bb02f0fc3e1d03
Parents: 7ac4a24
Author: zentol <ch...@apache.org>
Authored: Wed May 24 22:49:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:06 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 17 ++++----
 .../connectors/rabbitmq/RMQSource.java          | 27 ++++++------
 .../rabbitmq/common/RMQConnectionConfig.java    | 46 ++++++++++++--------
 .../connectors/rabbitmq/RMQSourceTest.java      | 23 +++++-----
 .../common/RMQConnectionConfigTest.java         |  5 ++-
 .../connectors/rabbitmq/common/RMQSinkTest.java | 19 +++++---
 6 files changed, 78 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index a0795d6..48675c5 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -17,21 +17,21 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
-import java.io.IOException;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 /**
- * A Sink for publishing data into RabbitMQ
+ * A Sink for publishing data into RabbitMQ.
  * @param <IN>
  */
 public class RMQSink<IN> extends RichSinkFunction<IN> {
@@ -78,7 +78,6 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		this.logFailuresOnly = logFailuresOnly;
 	}
 
-
 	@Override
 	public void open(Configuration config) throws Exception {
 		ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
@@ -110,7 +109,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 			if (logFailuresOnly) {
 				LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
 			} else {
-				throw new RuntimeException("Cannot send RMQ message " + queueName +" at " + rmqConnectionConfig.getHost(), e);
+				throw new RuntimeException("Cannot send RMQ message " + queueName + " at " + rmqConnectionConfig.getHost(), e);
 			}
 		}
 
@@ -128,12 +127,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		try {
 			connection.close();
 		} catch (IOException e) {
-			if(t != null) {
+			if (t != null) {
 				LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
 			}
 			t = e;
 		}
-		if(t != null) {
+		if (t != null) {
 			throw new RuntimeException("Error while closing RMQ connection with " + queueName
 					+ " at " + rmqConnectionConfig.getHost(), t);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index ee9c3b9..12a35f1 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -38,30 +35,33 @@ import com.rabbitmq.client.QueueingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
  * When checkpointing is enabled, it guarantees exactly-once processing semantics.
  *
- * RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
+ * <p>RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
  * which have not been acknowledged previously. When a failure occurs directly after a completed
  * checkpoint, all messages part of this checkpoint might be processed again because they couldn't
  * be acknowledged before failure. This case is handled by the {@link MessageAcknowledgingSourceBase}
  * base class which deduplicates the messages using the correlation id.
  *
- * RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
+ * <p>RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
  * Correlation ID in the message properties to check for duplicate messages. Note that the
  * correlation id has to be set at the producer. If the correlation id is not set, messages may
  * be produced more than once in corner cases.
  *
- * This source can be operated in three different modes:
+ * <p>This source can be operated in three different modes:
  *
- * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+ * <p>1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
  *    unique correlation IDs.
  * 2) At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
  *    (correlation id is not set).
  * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
  *
- * Users may overwrite the setupConnectionFactory() method to pass their setup their own
+ * <p>Users may overwrite the setupConnectionFactory() method to pass their setup their own
  * ConnectionFactory in case the constructor parameters are not sufficient.
  *
  * @param <OUT> The type of the data read from RabbitMQ.
@@ -89,9 +89,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	/**
 	 * Creates a new RabbitMQ source with at-least-once message processing guarantee when
 	 * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
-	 * For exactly-once, please use the constructor
-	 * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean usesCorrelationId, DeserializationSchema)},
-	 * set {@param usesCorrelationId} to true and enable checkpointing.
+	 *
+	 * <p>For exactly-once, please use the constructor
+	 * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean, DeserializationSchema)}.
 	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
 	 * @param queueName  The queue to receive messages from.
 	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
@@ -105,7 +105,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	/**
 	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
 	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
-	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
+	 * undefined. If in doubt, set usesCorrelationId to false. When correlation ids are not
 	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
 	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
 	 * @param queueName The queue to receive messages from.
@@ -116,7 +116,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 *                              into Java objects.
 	 */
 	public RMQSource(RMQConnectionConfig rmqConnectionConfig,
-					String queueName, boolean usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
+					String queueName, boolean usesCorrelationId, DeserializationSchema<OUT> deserializationSchema) {
 		super(String.class);
 		this.rmqConnectionConfig = rmqConnectionConfig;
 		this.queueName = queueName;
@@ -185,7 +185,6 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 		}
 	}
 
-
 	@Override
 	public void run(SourceContext<OUT> ctx) throws Exception {
 		while (running) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
index 72bac1c..cce800a 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -17,8 +17,9 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq.common;
 
-import com.rabbitmq.client.ConnectionFactory;
 import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.ConnectionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -176,7 +177,7 @@ public class RMQConnectionConfig implements Serializable {
 	}
 
 	/**
-	 * Returns true if automatic connection recovery is enabled, false otherwise
+	 * Returns true if automatic connection recovery is enabled, false otherwise.
 	 * @return true if automatic connection recovery is enabled, false otherwise
 	 */
 	public Boolean isAutomaticRecovery() {
@@ -184,7 +185,7 @@ public class RMQConnectionConfig implements Serializable {
 	}
 
 	/**
-	 * Returns true if topology recovery is enabled, false otherwise
+	 * Returns true if topology recovery is enabled, false otherwise.
 	 * @return true if topology recovery is enabled, false otherwise
 	 */
 	public Boolean isTopologyRecovery() {
@@ -200,7 +201,7 @@ public class RMQConnectionConfig implements Serializable {
 	}
 
 	/**
-	 * Retrieve the requested maximum channel number
+	 * Retrieve the requested maximum channel number.
 	 * @return the initially requested maximum channel number; zero for unlimited
 	 */
 	public Integer getRequestedChannelMax() {
@@ -208,7 +209,7 @@ public class RMQConnectionConfig implements Serializable {
 	}
 
 	/**
-	 * Retrieve the requested maximum frame size
+	 * Retrieve the requested maximum frame size.
 	 * @return the initially requested maximum frame size, in octets; zero for unlimited
 	 */
 	public Integer getRequestedFrameMax() {
@@ -226,7 +227,9 @@ public class RMQConnectionConfig implements Serializable {
 	/**
 	 *
 	 * @return Connection Factory for RMQ
-	 * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed
+	 * @throws URISyntaxException if Malformed URI has been passed
+	 * @throws NoSuchAlgorithmException if the ssl factory could not be created
+	 * @throws KeyManagementException if the ssl context could not be initialized
 	 */
 	public ConnectionFactory getConnectionFactory() throws URISyntaxException,
 		NoSuchAlgorithmException, KeyManagementException {
@@ -234,9 +237,17 @@ public class RMQConnectionConfig implements Serializable {
 		if (this.uri != null && !this.uri.isEmpty()){
 			try {
 				factory.setUri(this.uri);
-			} catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
+			} catch (URISyntaxException e) {
 				LOG.error("Failed to parse uri", e);
 				throw e;
+			} catch (KeyManagementException e) {
+				// this should never happen
+				LOG.error("Failed to initialize ssl context.", e);
+				throw e;
+			} catch (NoSuchAlgorithmException e) {
+				// this should never happen
+				LOG.error("Failed to setup ssl factory.", e);
+				throw e;
 			}
 		} else {
 			factory.setHost(this.host);
@@ -272,7 +283,7 @@ public class RMQConnectionConfig implements Serializable {
 	}
 
 	/**
-	 * The Builder Class for {@link RMQConnectionConfig}
+	 * The Builder Class for {@link RMQConnectionConfig}.
 	 */
 	public static class Builder {
 
@@ -355,7 +366,7 @@ public class RMQConnectionConfig implements Serializable {
 		}
 
 		/**
-		 * Enables or disables topology recovery
+		 * Enables or disables topology recovery.
 		 * @param topologyRecovery if true, enables topology recovery
 		 * @return the Builder
 		 */
@@ -375,7 +386,7 @@ public class RMQConnectionConfig implements Serializable {
 		}
 
 		/**
-		 * Set the requested maximum frame size
+		 * Set the requested maximum frame size.
 		 * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
 		 * @return the Builder
 		 */
@@ -385,7 +396,7 @@ public class RMQConnectionConfig implements Serializable {
 		}
 
 		/**
-		 * Set the requested maximum channel number
+		 * Set the requested maximum channel number.
 		 * @param requestedChannelMax initially requested maximum channel number; zero for unlimited
 		 */
 		public Builder setRequestedChannelMax(int requestedChannelMax) {
@@ -414,7 +425,7 @@ public class RMQConnectionConfig implements Serializable {
 		}
 
 		/**
-		 * Enables or disables automatic connection recovery
+		 * Enables or disables automatic connection recovery.
 		 * @param automaticRecovery if true, enables connection recovery
 		 * @return the Builder
 		 */
@@ -424,17 +435,18 @@ public class RMQConnectionConfig implements Serializable {
 		}
 
 		/**
-		 * The Builder method
-		 * If URI is NULL we use host, port, vHost, username, password combination
+		 * The Builder method.
+		 *
+		 * <p>If URI is NULL we use host, port, vHost, username, password combination
 		 * to initialize connection. using  {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
-		 * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+		 * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}.
 		 *
-		 * else URI will be used to initialize the client connection
+		 * <p>Otherwise the URI will be used to initialize the client connection
 		 * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
 		 * @return RMQConnectionConfig
 		 */
 		public RMQConnectionConfig build(){
-			if(this.uri != null) {
+			if (this.uri != null) {
 				return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
 					this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
 					this.requestedFrameMax, this.requestedHeartbeat);

http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index b65ddf0..05ae810 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -39,6 +33,13 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +60,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 
-
 /**
  * Tests for the RMQSource. The source supports two operation modes.
  * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and the deduplication mechanism in
@@ -67,7 +67,7 @@ import static org.mockito.Matchers.any;
  * 2) At-least-once (when checkpointed) with RabbitMQ transactions but not deduplication.
  * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
  *
- * This tests assumes that the message ids are increasing monotonously. That doesn't have to be the
+ * <p>This tests assumes that the message ids are increasing monotonously. That doesn't have to be the
  * case. The correlation id is used to uniquely identify messages.
  */
 @RunWith(PowerMockRunner.class)
@@ -156,7 +156,7 @@ public class RMQSourceTest {
 
 		long totalNumberOfAcks = 0;
 
-		for (int i=0; i < numSnapshots; i++) {
+		for (int i = 0; i < numSnapshots; i++) {
 			long snapshotId = random.nextLong();
 			OperatorStateHandles data;
 
@@ -230,9 +230,8 @@ public class RMQSourceTest {
 		}
 	}
 
-
 	/**
-	 * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids
+	 * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids.
 	 */
 	@Test
 	public void testCheckpointingDisabled() throws Exception {
@@ -248,7 +247,7 @@ public class RMQSourceTest {
 	}
 
 	/**
-	 * Tests error reporting in case of invalid correlation ids
+	 * Tests error reporting in case of invalid correlation ids.
 	 */
 	@Test
 	public void testCorrelationIdNotSet() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
index 40985ce..9cfac92 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -26,7 +26,9 @@ import java.security.NoSuchAlgorithmException;
 
 import static org.junit.Assert.assertEquals;
 
-
+/**
+ * Tests for the {@link RMQConnectionConfig}.
+ */
 public class RMQConnectionConfigTest {
 
 	@Test(expected = NullPointerException.class)
@@ -37,6 +39,7 @@ public class RMQConnectionConfigTest {
 			.setPassword("guest").setVirtualHost("/").build();
 		connectionConfig.getConnectionFactory();
 	}
+
 	@Test(expected = NullPointerException.class)
 	public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException,
 		KeyManagementException, URISyntaxException {

http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
index 199cd1e..540a7ba 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -17,20 +17,28 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq.common;
 
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RMQSink}.
+ */
 public class RMQSinkTest {
 
 	private static final String QUEUE_NAME = "queue";
@@ -43,7 +51,6 @@ public class RMQSinkTest {
 	private Channel channel;
 	private SerializationSchema<String> serializationSchema;
 
-
 	@Before
 	public void before() throws Exception {
 		serializationSchema = spy(new DummySerializationSchema());