You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/05/09 07:05:15 UTC

[flink] branch master updated: [FLINK-17204][connectors/rabbitmq] Make RMQ queue declaration consistent between source and sink

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b8ff763  [FLINK-17204][connectors/rabbitmq] Make RMQ queue declaration consistent between source and sink
b8ff763 is described below

commit b8ff76389a71978d247f0f59a16ccb98334a982b
Author: austin ce <au...@gmail.com>
AuthorDate: Tue May 5 14:31:17 2020 -0400

    [FLINK-17204][connectors/rabbitmq] Make RMQ queue declaration consistent between source and sink
    
    This closes #12001
---
 .../streaming/connectors/rabbitmq/RMQSink.java     |  2 +-
 .../streaming/connectors/rabbitmq/RMQSource.java   |  2 +-
 .../flink/streaming/connectors/rabbitmq/Util.java  | 34 +++++++++
 .../streaming/connectors/rabbitmq/RMQSinkTest.java |  4 +-
 .../connectors/rabbitmq/RMQSourceTest.java         | 80 ++++++++++++++++++----
 5 files changed, 105 insertions(+), 17 deletions(-)

diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 5e3696a..5966713 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -121,7 +121,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	 */
 	protected void setupQueue() throws IOException {
 		if (queueName != null) {
-			channel.queueDeclare(queueName, false, false, false, null);
+			Util.declareQueueDefaults(channel, queueName);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 99b94df..b34aa49 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -138,7 +138,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * defining custom queue parameters)
 	 */
 	protected void setupQueue() throws IOException {
-		channel.queueDeclare(queueName, true, false, false, null);
+		Util.declareQueueDefaults(channel, queueName);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Util.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Util.java
new file mode 100644
index 0000000..0c34be8
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Util.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+
+import java.io.IOException;
+
+class Util {
+	/**
+	 * Declares a queue with sensible defaults: durable, non-exclusive, non-autoDelete, and no arguments.
+	 * @param channel the open RMQ channel.
+	 * @param queueName the name of the queue.
+	 * @throws IOException if an error is encountered setting up the queue.
+	 */
+	static void declareQueueDefaults(Channel channel, String queueName) throws IOException {
+		channel.queueDeclare(queueName, true, false, false, null);
+	}
+}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
index 51c8497..ea126d0 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -87,14 +87,14 @@ public class RMQSinkTest {
 	public void openCallDeclaresQueueInStandardMode() throws Exception {
 		createRMQSink();
 
-		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
+		verify(channel).queueDeclare(QUEUE_NAME, true, false, false, null);
 	}
 
 	@Test
 	public void openCallDontDeclaresQueueInWithOptionsMode() throws Exception {
 		createRMQSinkWithOptions(false, false);
 
-		verify(channel, never()).queueDeclare(null, false, false, false, null);
+		verify(channel, never()).queueDeclare(null, true, false, false, null);
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 7474c2f..925457f 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -89,13 +89,21 @@ public class RMQSourceTest {
 
 	private volatile Exception exception;
 
-	@Before
-	public void beforeTest() throws Exception {
-
+	/**
+	 * Gets a mock context for initializing the source's state via {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState}.
+	 * @throws Exception
+	 */
+	FunctionInitializationContext getMockContext() throws Exception {
 		OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
 		FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
 		Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
 		Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
+		return mockContext;
+	}
+
+	@Before
+	public void beforeTest() throws Exception {
+		FunctionInitializationContext mockContext = getMockContext();
 
 		source = new RMQTestSource();
 		source.initializeState(mockContext);
@@ -141,6 +149,26 @@ public class RMQSourceTest {
 	}
 
 	@Test
+	public void testOpenCallDeclaresQueueInStandardMode() throws Exception {
+		FunctionInitializationContext mockContext = getMockContext();
+
+		RMQConnectionConfig connectionConfig = Mockito.mock(RMQConnectionConfig.class);
+		ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
+		Connection connection = Mockito.mock(Connection.class);
+		Channel channel = Mockito.mock(Channel.class);
+
+		Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+		Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+		Mockito.when(connection.createChannel()).thenReturn(channel);
+
+		RMQSource<String> rmqSource = new RMQMockedRuntimeTestSource(connectionConfig);
+		rmqSource.initializeState(mockContext);
+		rmqSource.open(new Configuration());
+
+		Mockito.verify(channel).queueDeclare(RMQTestSource.QUEUE_NAME, true, false, false, null);
+	}
+
+	@Test
 	public void testCheckpointing() throws Exception {
 		source.autoAck = false;
 
@@ -354,19 +382,50 @@ public class RMQSourceTest {
 		}
 	}
 
-	private class RMQTestSource extends RMQSource<String> {
+	/**
+	 * A base class of {@link RMQTestSource} for testing functions that rely on the {@link RuntimeContext}.
+	 */
+	private static class RMQMockedRuntimeTestSource extends RMQSource<String> {
+		static final String QUEUE_NAME = "queueDummy";
+
+		static final RMQConnectionConfig CONNECTION_CONFIG = new RMQConnectionConfig
+			.Builder()
+			.setHost("hostTest")
+			.setPort(999)
+			.setUserName("userTest")
+			.setPassword("passTest")
+			.setVirtualHost("/")
+			.build();
+
+		protected RuntimeContext runtimeContext = Mockito.mock(StreamingRuntimeContext.class);
+
+		public RMQMockedRuntimeTestSource(RMQConnectionConfig connectionConfig, DeserializationSchema<String> deserializationSchema) {
+			super(connectionConfig, QUEUE_NAME, true, deserializationSchema);
+		}
+
+		public RMQMockedRuntimeTestSource(DeserializationSchema<String> deserializationSchema) {
+			this(CONNECTION_CONFIG, deserializationSchema);
+		}
+
+		public RMQMockedRuntimeTestSource(RMQConnectionConfig connectionConfig) {
+			this(connectionConfig, new StringDeserializationScheme());
+		}
 
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return runtimeContext;
+		}
+	}
+
+	private class RMQTestSource extends RMQMockedRuntimeTestSource {
 		private ArrayDeque<Tuple2<Long, Set<String>>> restoredState;
-		private RuntimeContext runtimeContext = Mockito.mock(StreamingRuntimeContext.class);
 
 		public RMQTestSource() {
 			this(new StringDeserializationScheme());
 		}
 
 		public RMQTestSource(DeserializationSchema<String> deserializationSchema) {
-			super(new RMQConnectionConfig.Builder().setHost("hostTest")
-					.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
-				, "queueDummy", true, deserializationSchema);
+			super(deserializationSchema);
 		}
 
 		@Override
@@ -438,11 +497,6 @@ public class RMQSourceTest {
 		}
 
 		@Override
-		public RuntimeContext getRuntimeContext() {
-			return runtimeContext;
-		}
-
-		@Override
 		protected boolean addId(String uid) {
 			assertEquals(false, autoAck);
 			return super.addId(uid);