You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 21:04:53 UTC

[flink] 02/06: [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fa85fea3c3dc1414c1aa4147744c82b3f4fede0
Author: austin ce <au...@gmail.com>
AuthorDate: Mon May 11 20:00:36 2020 -0400

    [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection
    
    This closes #12185
---
 .../flink/streaming/connectors/rabbitmq/RMQSink.java  | 19 +++++++++++++++++--
 .../streaming/connectors/rabbitmq/RMQSinkTest.java    | 19 +++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 5966713..0218df5 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -137,13 +137,28 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		this.logFailuresOnly = logFailuresOnly;
 	}
 
+	/**
+	 * Initializes the connection to RMQ with a default connection factory. The user may override
+	 * this method to setup and configure their own {@link ConnectionFactory}.
+	 */
+	protected ConnectionFactory setupConnectionFactory() throws Exception {
+		return rmqConnectionConfig.getConnectionFactory();
+	}
+
+	/**
+	 * Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
+	 * The user may override this method to setup and configure their own {@link Connection}.
+	 */
+	protected Connection setupConnection() throws Exception {
+		return setupConnectionFactory().newConnection();
+	}
+
 	@Override
 	public void open(Configuration config) throws Exception {
-		ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
 		schema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
 
 		try {
-			connection = factory.newConnection();
+			connection = setupConnection();
 			channel = connection.createChannel();
 			if (channel == null) {
 				throw new RuntimeException("None of RabbitMQ channels are available");
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
index ea126d0..72fe1af 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -98,6 +99,24 @@ public class RMQSinkTest {
 	}
 
 	@Test
+	public void testOverrideConnection() throws Exception {
+		final Connection mockConnection = mock(Connection.class);
+		Channel channel = mock(Channel.class);
+		when(mockConnection.createChannel()).thenReturn(channel);
+
+		RMQSink<String> rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema) {
+			@Override
+			protected Connection setupConnection() throws Exception {
+				return mockConnection;
+			}
+		};
+
+		rmqSink.open(new Configuration());
+
+		verify(mockConnection, times(1)).createChannel();
+	}
+
+	@Test
 	public void throwExceptionIfChannelIsNull() throws Exception {
 		when(connection.createChannel()).thenReturn(null);
 		try {