You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 21:04:53 UTC
[flink] 02/06: [FLINK-7267][connectors/rabbitmq] Allow overriding
RMQSink connection
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6fa85fea3c3dc1414c1aa4147744c82b3f4fede0
Author: austin ce <au...@gmail.com>
AuthorDate: Mon May 11 20:00:36 2020 -0400
[FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection
This closes #12185
---
.../flink/streaming/connectors/rabbitmq/RMQSink.java | 19 +++++++++++++++++--
.../streaming/connectors/rabbitmq/RMQSinkTest.java | 19 +++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 5966713..0218df5 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -137,13 +137,28 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
this.logFailuresOnly = logFailuresOnly;
}
+ /**
+ * Initializes the connection to RMQ with a default connection factory. The user may override
+ * this method to setup and configure their own {@link ConnectionFactory}.
+ */
+ protected ConnectionFactory setupConnectionFactory() throws Exception {
+ return rmqConnectionConfig.getConnectionFactory();
+ }
+
+ /**
+ * Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
+ * The user may override this method to setup and configure their own {@link Connection}.
+ */
+ protected Connection setupConnection() throws Exception {
+ return setupConnectionFactory().newConnection();
+ }
+
@Override
public void open(Configuration config) throws Exception {
- ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
schema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
try {
- connection = factory.newConnection();
+ connection = setupConnection();
channel = connection.createChannel();
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
index ea126d0..72fe1af 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -98,6 +99,24 @@ public class RMQSinkTest {
}
@Test
+ public void testOverrideConnection() throws Exception {
+ final Connection mockConnection = mock(Connection.class);
+ Channel channel = mock(Channel.class);
+ when(mockConnection.createChannel()).thenReturn(channel);
+
+ RMQSink<String> rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema) {
+ @Override
+ protected Connection setupConnection() throws Exception {
+ return mockConnection;
+ }
+ };
+
+ rmqSink.open(new Configuration());
+
+ verify(mockConnection, times(1)).createChannel();
+ }
+
+ @Test
public void throwExceptionIfChannelIsNull() throws Exception {
when(connection.createChannel()).thenReturn(null);
try {