You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:33 UTC
[03/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-rabbitmq
[FLINK-6711] Activate strict checkstyle for flink-connector-rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4f73391
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4f73391
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4f73391
Branch: refs/heads/master
Commit: d4f73391708bdfe466a5c3c771bb02f0fc3e1d03
Parents: 7ac4a24
Author: zentol <ch...@apache.org>
Authored: Wed May 24 22:49:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:06 2017 +0200
----------------------------------------------------------------------
.../streaming/connectors/rabbitmq/RMQSink.java | 17 ++++----
.../connectors/rabbitmq/RMQSource.java | 27 ++++++------
.../rabbitmq/common/RMQConnectionConfig.java | 46 ++++++++++++--------
.../connectors/rabbitmq/RMQSourceTest.java | 23 +++++-----
.../common/RMQConnectionConfigTest.java | 5 ++-
.../connectors/rabbitmq/common/RMQSinkTest.java | 19 +++++---
6 files changed, 78 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index a0795d6..48675c5 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -17,21 +17,21 @@
package org.apache.flink.streaming.connectors.rabbitmq;
-import java.io.IOException;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
/**
- * A Sink for publishing data into RabbitMQ
+ * A Sink for publishing data into RabbitMQ.
* @param <IN>
*/
public class RMQSink<IN> extends RichSinkFunction<IN> {
@@ -78,7 +78,6 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
this.logFailuresOnly = logFailuresOnly;
}
-
@Override
public void open(Configuration config) throws Exception {
ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
@@ -110,7 +109,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
if (logFailuresOnly) {
LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
} else {
- throw new RuntimeException("Cannot send RMQ message " + queueName +" at " + rmqConnectionConfig.getHost(), e);
+ throw new RuntimeException("Cannot send RMQ message " + queueName + " at " + rmqConnectionConfig.getHost(), e);
}
}
@@ -128,12 +127,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
try {
connection.close();
} catch (IOException e) {
- if(t != null) {
+ if (t != null) {
LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
}
t = e;
}
- if(t != null) {
+ if (t != null) {
throw new RuntimeException("Error while closing RMQ connection with " + queueName
+ " at " + rmqConnectionConfig.getHost(), t);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index ee9c3b9..12a35f1 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.connectors.rabbitmq;
-import java.io.IOException;
-import java.util.List;
-
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -38,30 +35,33 @@ import com.rabbitmq.client.QueueingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.List;
+
/**
* RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
* When checkpointing is enabled, it guarantees exactly-once processing semantics.
*
- * RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
+ * <p>RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
* which have not been acknowledged previously. When a failure occurs directly after a completed
* checkpoint, all messages part of this checkpoint might be processed again because they couldn't
* be acknowledged before failure. This case is handled by the {@link MessageAcknowledgingSourceBase}
* base class which deduplicates the messages using the correlation id.
*
- * RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
+ * <p>RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
* Correlation ID in the message properties to check for duplicate messages. Note that the
* correlation id has to be set at the producer. If the correlation id is not set, messages may
* be produced more than once in corner cases.
*
- * This source can be operated in three different modes:
+ * <p>This source can be operated in three different modes:
*
- * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+ * <p>1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
* unique correlation IDs.
* 2) At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
* (correlation id is not set).
* 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
*
- * Users may overwrite the setupConnectionFactory() method to pass their setup their own
+ * <p>Users may overwrite the setupConnectionFactory() method to pass their setup their own
* ConnectionFactory in case the constructor parameters are not sufficient.
*
* @param <OUT> The type of the data read from RabbitMQ.
@@ -89,9 +89,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
/**
* Creates a new RabbitMQ source with at-least-once message processing guarantee when
* checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
- * For exactly-once, please use the constructor
- * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean usesCorrelationId, DeserializationSchema)},
- * set {@param usesCorrelationId} to true and enable checkpointing.
+ *
+ * <p>For exactly-once, please use the constructor
+ * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean, DeserializationSchema)}.
* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to receive messages from.
* @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
@@ -105,7 +105,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
/**
* Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
* at the producer. The correlation id must be unique. Otherwise the behavior of the source is
- * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
+ * undefined. If in doubt, set usesCorrelationId to false. When correlation ids are not
* used, this source has at-least-once processing semantics when checkpointing is enabled.
* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to receive messages from.
@@ -116,7 +116,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
* into Java objects.
*/
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
- String queueName, boolean usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
+ String queueName, boolean usesCorrelationId, DeserializationSchema<OUT> deserializationSchema) {
super(String.class);
this.rmqConnectionConfig = rmqConnectionConfig;
this.queueName = queueName;
@@ -185,7 +185,6 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
}
}
-
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
while (running) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
index 72bac1c..cce800a 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -17,8 +17,9 @@
package org.apache.flink.streaming.connectors.rabbitmq.common;
-import com.rabbitmq.client.ConnectionFactory;
import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -176,7 +177,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Returns true if automatic connection recovery is enabled, false otherwise
+ * Returns true if automatic connection recovery is enabled, false otherwise.
* @return true if automatic connection recovery is enabled, false otherwise
*/
public Boolean isAutomaticRecovery() {
@@ -184,7 +185,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Returns true if topology recovery is enabled, false otherwise
+ * Returns true if topology recovery is enabled, false otherwise.
* @return true if topology recovery is enabled, false otherwise
*/
public Boolean isTopologyRecovery() {
@@ -200,7 +201,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Retrieve the requested maximum channel number
+ * Retrieve the requested maximum channel number.
* @return the initially requested maximum channel number; zero for unlimited
*/
public Integer getRequestedChannelMax() {
@@ -208,7 +209,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Retrieve the requested maximum frame size
+ * Retrieve the requested maximum frame size.
* @return the initially requested maximum frame size, in octets; zero for unlimited
*/
public Integer getRequestedFrameMax() {
@@ -226,7 +227,9 @@ public class RMQConnectionConfig implements Serializable {
/**
*
* @return Connection Factory for RMQ
- * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed
+ * @throws URISyntaxException if Malformed URI has been passed
+ * @throws NoSuchAlgorithmException if the ssl factory could not be created
+ * @throws KeyManagementException if the ssl context could not be initialized
*/
public ConnectionFactory getConnectionFactory() throws URISyntaxException,
NoSuchAlgorithmException, KeyManagementException {
@@ -234,9 +237,17 @@ public class RMQConnectionConfig implements Serializable {
if (this.uri != null && !this.uri.isEmpty()){
try {
factory.setUri(this.uri);
- } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
+ } catch (URISyntaxException e) {
LOG.error("Failed to parse uri", e);
throw e;
+ } catch (KeyManagementException e) {
+ // this should never happen
+ LOG.error("Failed to initialize ssl context.", e);
+ throw e;
+ } catch (NoSuchAlgorithmException e) {
+ // this should never happen
+ LOG.error("Failed to setup ssl factory.", e);
+ throw e;
}
} else {
factory.setHost(this.host);
@@ -272,7 +283,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * The Builder Class for {@link RMQConnectionConfig}
+ * The Builder Class for {@link RMQConnectionConfig}.
*/
public static class Builder {
@@ -355,7 +366,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Enables or disables topology recovery
+ * Enables or disables topology recovery.
* @param topologyRecovery if true, enables topology recovery
* @return the Builder
*/
@@ -375,7 +386,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Set the requested maximum frame size
+ * Set the requested maximum frame size.
* @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
* @return the Builder
*/
@@ -385,7 +396,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Set the requested maximum channel number
+ * Set the requested maximum channel number.
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
*/
public Builder setRequestedChannelMax(int requestedChannelMax) {
@@ -414,7 +425,7 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * Enables or disables automatic connection recovery
+ * Enables or disables automatic connection recovery.
* @param automaticRecovery if true, enables connection recovery
* @return the Builder
*/
@@ -424,17 +435,18 @@ public class RMQConnectionConfig implements Serializable {
}
/**
- * The Builder method
- * If URI is NULL we use host, port, vHost, username, password combination
+ * The Builder method.
+ *
+ * <p>If URI is NULL we use host, port, vHost, username, password combination
* to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
- * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}.
*
- * else URI will be used to initialize the client connection
+ * <p>Otherwise the URI will be used to initialize the client connection
* {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
* @return RMQConnectionConfig
*/
public RMQConnectionConfig build(){
- if(this.uri != null) {
+ if (this.uri != null) {
return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
this.requestedFrameMax, this.requestedHeartbeat);
http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index b65ddf0..05ae810 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -17,12 +17,6 @@
package org.apache.flink.streaming.connectors.rabbitmq;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -39,6 +33,13 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -59,7 +60,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-
/**
* Tests for the RMQSource. The source supports two operation modes.
* 1) Exactly-once (when checkpointed) with RabbitMQ transactions and the deduplication mechanism in
@@ -67,7 +67,7 @@ import static org.mockito.Matchers.any;
* 2) At-least-once (when checkpointed) with RabbitMQ transactions but not deduplication.
* 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
*
- * This tests assumes that the message ids are increasing monotonously. That doesn't have to be the
+ * <p>This tests assumes that the message ids are increasing monotonously. That doesn't have to be the
* case. The correlation id is used to uniquely identify messages.
*/
@RunWith(PowerMockRunner.class)
@@ -156,7 +156,7 @@ public class RMQSourceTest {
long totalNumberOfAcks = 0;
- for (int i=0; i < numSnapshots; i++) {
+ for (int i = 0; i < numSnapshots; i++) {
long snapshotId = random.nextLong();
OperatorStateHandles data;
@@ -230,9 +230,8 @@ public class RMQSourceTest {
}
}
-
/**
- * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids
+ * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids.
*/
@Test
public void testCheckpointingDisabled() throws Exception {
@@ -248,7 +247,7 @@ public class RMQSourceTest {
}
/**
- * Tests error reporting in case of invalid correlation ids
+ * Tests error reporting in case of invalid correlation ids.
*/
@Test
public void testCorrelationIdNotSet() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
index 40985ce..9cfac92 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -26,7 +26,9 @@ import java.security.NoSuchAlgorithmException;
import static org.junit.Assert.assertEquals;
-
+/**
+ * Tests for the {@link RMQConnectionConfig}.
+ */
public class RMQConnectionConfigTest {
@Test(expected = NullPointerException.class)
@@ -37,6 +39,7 @@ public class RMQConnectionConfigTest {
.setPassword("guest").setVirtualHost("/").build();
connectionConfig.getConnectionFactory();
}
+
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException,
KeyManagementException, URISyntaxException {
http://git-wip-us.apache.org/repos/asf/flink/blob/d4f73391/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
index 199cd1e..540a7ba 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -17,20 +17,28 @@
package org.apache.flink.streaming.connectors.rabbitmq.common;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RMQSink}.
+ */
public class RMQSinkTest {
private static final String QUEUE_NAME = "queue";
@@ -43,7 +51,6 @@ public class RMQSinkTest {
private Channel channel;
private SerializationSchema<String> serializationSchema;
-
@Before
public void before() throws Exception {
serializationSchema = spy(new DummySerializationSchema());