You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/29 18:24:30 UTC

[flink] branch master updated (4343e00 -> 4130b3b)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4343e00  [FLINK-18606][java-streaming] Remove unused generic parameter from SinkFunction.Context
     new 3b6ca3c  [FLINK-10195][connectors/rabbitmq] Allow setting QoS
     new 4130b3b  [FLINK-17529][connectors/rabbitmq] Upgrade com.rabbitmq:amqp-client to 5.9.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/connectors/rabbitmq.md                    |  34 +++++
 flink-connectors/flink-connector-rabbitmq/pom.xml  |   2 +-
 .../connectors/rabbitmq/QueueingConsumer.java      | 160 +++++++++++++++++++++
 .../streaming/connectors/rabbitmq/RMQSource.java   |  28 +++-
 .../rabbitmq/common/RMQConnectionConfig.java       |  49 +++++--
 .../connectors/rabbitmq/RMQSourceTest.java         |  57 +++++++-
 .../rabbitmq/common/RMQConnectionConfigTest.java   |  31 ++++
 7 files changed, 346 insertions(+), 15 deletions(-)
 create mode 100644 flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java


[flink] 02/02: [FLINK-17529][connectors/rabbitmq] Upgrade com.rabbitmq:amqp-client to 5.9.0

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4130b3bab13d2780e019ad1fa84f3ec0365f6ce0
Author: austin ce <au...@gmail.com>
AuthorDate: Wed Jul 29 11:36:41 2020 -0400

    [FLINK-17529][connectors/rabbitmq] Upgrade com.rabbitmq:amqp-client to 5.9.0
---
 flink-connectors/flink-connector-rabbitmq/pom.xml  |  2 +-
 .../streaming/connectors/rabbitmq/Delivery.java    | 67 ----------------------
 .../connectors/rabbitmq/QueueingConsumer.java      |  1 +
 .../streaming/connectors/rabbitmq/RMQSource.java   |  1 +
 .../connectors/rabbitmq/RMQSourceTest.java         |  1 +
 5 files changed, 4 insertions(+), 68 deletions(-)

diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index d6e4c3f..2770e2d 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<rabbitmq.version>4.2.0</rabbitmq.version>
+		<rabbitmq.version>5.9.0</rabbitmq.version>
 	</properties>
 
 	<dependencies>
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
deleted file mode 100644
index 4e3418a..0000000
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
-//	- brought this class out of com.rabbitmq.client.QueueingConsumer
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Envelope;
-
-/**
- * Encapsulates an arbitrary message - simple "bean" holder structure.
- * TODO: replace this with `com.rabbitmq.client.Delivery` in RMQ v5.x
- */
-public class Delivery {
-	private final Envelope envelope;
-	private final AMQP.BasicProperties properties;
-	private final byte[] body;
-
-	public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
-		this.envelope = envelope;
-		this.properties = properties;
-		this.body = body;
-	}
-
-	/**
-	 * Retrieve the message envelope.
-	 *
-	 * @return the message envelope
-	 */
-	public Envelope getEnvelope() {
-		return envelope;
-	}
-
-	/**
-	 * Retrieve the message properties.
-	 *
-	 * @return the message properties
-	 */
-	public AMQP.BasicProperties getProperties() {
-		return properties;
-	}
-
-	/**
-	 * Retrieve the message body.
-	 *
-	 * @return the message body
-	 */
-	public byte[] getBody() {
-		return body;
-	}
-}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
index f99a120..7e92161 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
@@ -25,6 +25,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.ConsumerCancelledException;
 import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.ShutdownSignalException;
 import com.rabbitmq.utility.Utility;
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index cae3bc1..594741f 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.Preconditions;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Delivery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 9e5eedd..df229779 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -40,6 +40,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Delivery;
 import com.rabbitmq.client.Envelope;
 import org.junit.After;
 import org.junit.Before;


[flink] 01/02: [FLINK-10195][connectors/rabbitmq] Allow setting QoS

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b6ca3c512f2742c84b3c623ac79c61a04af9ab1
Author: austin ce <au...@gmail.com>
AuthorDate: Mon Jun 15 09:31:08 2020 -0400

    [FLINK-10195][connectors/rabbitmq] Allow setting QoS
---
 docs/dev/connectors/rabbitmq.md                    |  34 +++++
 .../streaming/connectors/rabbitmq/Delivery.java    |  67 +++++++++
 .../connectors/rabbitmq/QueueingConsumer.java      | 159 +++++++++++++++++++++
 .../streaming/connectors/rabbitmq/RMQSource.java   |  27 +++-
 .../rabbitmq/common/RMQConnectionConfig.java       |  49 +++++--
 .../connectors/rabbitmq/RMQSourceTest.java         |  56 +++++++-
 .../rabbitmq/common/RMQConnectionConfigTest.java   |  31 ++++
 7 files changed, 409 insertions(+), 14 deletions(-)

diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index 819979e..87473e0 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -131,6 +131,40 @@ val stream = env
 </div>
 </div>
 
+#### Quality of Service (QoS) / Consumer Prefetch
+
+The RabbitMQ Source provides a simple way to set the `basicQos` on the source's channel through the `RMQConnectionConfig`.
+Since there is one connection/ channel per-parallel source, this prefetch count will effectively be multiplied by the
+source's parallelism for how many total unacknowledged messages can be sent to the job at one time.
+If more complex configuration is required, `RMQSource#setupChannel(Connection)` can be overridden and manually configured.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+    .setPrefetchCount(30_000)
+    ...
+    .build();
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val connectionConfig = new RMQConnectionConfig.Builder()
+    .setPrefetchCount(30000)
+    ...
+    .build
+{% endhighlight %}
+</div>
+</div>
+
+The prefetch count is unset by default, meaning the RabbitMQ server will send unlimited messages. In production, it
+is best to set this value. For high volume queues and checkpointing enabled, some tuning may be required to reduce
+wasted cycles, as messages are only acknowledged on checkpoints if enabled.
+
+More about QoS and prefetch can be found [here](https://www.rabbitmq.com/confirms.html#channel-qos-prefetch)
+and more about the options available in AMQP 0-9-1 [here](https://www.rabbitmq.com/consumer-prefetch.html).
+
 ### RabbitMQ Sink
 This connector provides a `RMQSink` class for sending messages to a RabbitMQ
 queue. Below is a code example for setting up a RabbitMQ sink.
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
new file mode 100644
index 0000000..4e3418a
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
+//	- brought this class out of com.rabbitmq.client.QueueingConsumer
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+/**
+ * Encapsulates an arbitrary message - simple "bean" holder structure.
+ * TODO: replace this with `com.rabbitmq.client.Delivery` in RMQ v5.x
+ */
+public class Delivery {
+	private final Envelope envelope;
+	private final AMQP.BasicProperties properties;
+	private final byte[] body;
+
+	public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+		this.envelope = envelope;
+		this.properties = properties;
+		this.body = body;
+	}
+
+	/**
+	 * Retrieve the message envelope.
+	 *
+	 * @return the message envelope
+	 */
+	public Envelope getEnvelope() {
+		return envelope;
+	}
+
+	/**
+	 * Retrieve the message properties.
+	 *
+	 * @return the message properties
+	 */
+	public AMQP.BasicProperties getProperties() {
+		return properties;
+	}
+
+	/**
+	 * Retrieve the message body.
+	 *
+	 * @return the message body
+	 */
+	public byte[] getBody() {
+		return body;
+	}
+}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
new file mode 100644
index 0000000..f99a120
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
+//	- copied from com.rabbitmq.client.QueueingConsumer
+//	- updated naming conventions for the Apache Flink standards
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConsumerCancelledException;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import com.rabbitmq.utility.Utility;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+class QueueingConsumer extends DefaultConsumer {
+	private final BlockingQueue<Delivery> queue;
+
+	// When this is non-null the queue is in shutdown mode and nextDelivery should
+	// throw a shutdown signal exception.
+	private volatile ShutdownSignalException shutdown;
+	private volatile ConsumerCancelledException cancelled;
+
+	// Marker object used to signal the queue is in shutdown mode.
+	// It is only there to wake up consumers. The canonical representation
+	// of shutting down is the presence of shutdown.
+	// Invariant: This is never on queue unless shutdown != null.
+	private static final Delivery POISON = new Delivery(null, null, null);
+
+	public QueueingConsumer(Channel channel) {
+		this(channel, Integer.MAX_VALUE);
+	}
+
+	public QueueingConsumer(Channel channel, int capacity) {
+		super(channel);
+		this.queue = new LinkedBlockingQueue<>(capacity);
+	}
+
+	/**
+	 * Check if we are in shutdown mode and if so throw an exception.
+	 */
+	private void checkShutdown() {
+		if (shutdown != null) {
+			throw Utility.fixStackTrace(shutdown);
+		}
+	}
+
+	/**
+	 * If delivery is not POISON nor null, return it.
+	 * <p/>
+	 * If delivery, shutdown and cancelled are all null, return null.
+	 * <p/>
+	 * If delivery is POISON re-insert POISON into the queue and
+	 * throw an exception if POISONed for no reason.
+	 * <p/>
+	 * Otherwise, if we are in shutdown mode or cancelled,
+	 * throw a corresponding exception.
+	 */
+	private Delivery handle(Delivery delivery) {
+		if (delivery == POISON ||
+			delivery == null && (shutdown != null || cancelled != null)) {
+			if (delivery == POISON) {
+				queue.add(POISON);
+				if (shutdown == null && cancelled == null) {
+					throw new IllegalStateException(
+						"POISON in queue, but null shutdown and null cancelled. " +
+							"This should never happen, please report as a BUG");
+				}
+			}
+			if (null != shutdown) {
+				throw Utility.fixStackTrace(shutdown);
+			}
+			if (null != cancelled) {
+				throw Utility.fixStackTrace(cancelled);
+			}
+		}
+		return delivery;
+	}
+
+	/**
+	 * Main application-side API: wait for the next message delivery and return it.
+	 *
+	 * @return the next message
+	 * @throws InterruptedException       if an interrupt is received while waiting
+	 * @throws ShutdownSignalException    if the connection is shut down while waiting
+	 * @throws ConsumerCancelledException if this consumer is cancelled while waiting
+	 */
+	public Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
+		return handle(queue.take());
+	}
+
+	/**
+	 * Main application-side API: wait for the next message delivery and return it.
+	 *
+	 * @param timeout timeout in millisecond
+	 * @return the next message or null if timed out
+	 * @throws InterruptedException       if an interrupt is received while waiting
+	 * @throws ShutdownSignalException    if the connection is shut down while waiting
+	 * @throws ConsumerCancelledException if this consumer is cancelled while waiting
+	 */
+	public Delivery nextDelivery(long timeout)
+		throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
+		return nextDelivery(timeout, TimeUnit.MILLISECONDS);
+	}
+
+	/**
+	 * Main application-side API: wait for the next message delivery and return it.
+	 *
+	 * @param timeout timeout
+	 * @param unit    timeout unit
+	 * @return the next message or null if timed out
+	 * @throws InterruptedException       if an interrupt is received while waiting
+	 * @throws ShutdownSignalException    if the connection is shut down while waiting
+	 * @throws ConsumerCancelledException if this consumer is cancelled while waiting
+	 */
+	public Delivery nextDelivery(long timeout, TimeUnit unit)
+		throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
+		return handle(queue.poll(timeout, unit));
+	}
+
+	@Override
+	public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+		shutdown = sig;
+		queue.add(POISON);
+	}
+
+	@Override
+	public void handleCancel(String consumerTag) throws IOException {
+		cancelled = new ConsumerCancelledException();
+		queue.add(POISON);
+	}
+
+	@Override
+	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+		checkShutdown();
+		this.queue.add(new Delivery(envelope, properties, body));
+	}
+}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index e32ce2d..cae3bc1 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -32,7 +33,6 @@ import org.apache.flink.util.Preconditions;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,15 +137,36 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
 	 * The user may override this method to setup and configure their own {@link Connection}.
 	 */
+	@VisibleForTesting
 	protected Connection setupConnection() throws Exception {
 		return setupConnectionFactory().newConnection();
 	}
 
 	/**
+	 * Initializes the consumer's {@link Channel}. If a prefetch count has been set in {@link RMQConnectionConfig},
+	 * the new channel will be use it for {@link Channel#basicQos(int)}.
+	 *
+	 * @param connection the consumer's {@link Connection}.
+	 * @return the channel.
+	 * @throws Exception if there is an issue creating or configuring the channel.
+	 */
+	private Channel setupChannel(Connection connection) throws Exception {
+		Channel chan = connection.createChannel();
+		if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
+			// set the global flag for the entire channel, though shouldn't make a difference
+			// since there is only one consumer, and each parallel instance of the source will
+			// create a new connection (and channel)
+			chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true);
+		}
+		return chan;
+	}
+
+	/**
 	 * Sets up the queue. The default implementation just declares the queue. The user may override
 	 * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
 	 * defining custom queue parameters)
 	 */
+	@VisibleForTesting
 	protected void setupQueue() throws IOException {
 		Util.declareQueueDefaults(channel, queueName);
 	}
@@ -155,7 +176,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 		super.open(config);
 		try {
 			connection = setupConnection();
-			channel = connection.createChannel();
+			channel = setupChannel(connection);
 			if (channel == null) {
 				throw new RuntimeException("None of RabbitMQ channels are available");
 			}
@@ -219,7 +240,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	public void run(SourceContext<OUT> ctx) throws Exception {
 		final RMQCollector collector = new RMQCollector(ctx);
 		while (running) {
-			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+			Delivery delivery = consumer.nextDelivery();
 
 			synchronized (ctx.getCheckpointLock()) {
 				if (!autoAck) {
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
index bbb48ce..aaa9fe6 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq.common;
 
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
 import org.apache.flink.util.Preconditions;
 
 import com.rabbitmq.client.ConnectionFactory;
@@ -27,14 +28,15 @@ import java.io.Serializable;
 import java.net.URISyntaxException;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
 
 /**
  * Connection Configuration for RMQ.
  * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer,
- * Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)}
  * will be used for initialize the RMQ connection or
  * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean,
- * Boolean, Integer, Integer, Integer, Integer)}
+ * Boolean, Integer, Integer, Integer, Integer, Integer)}
  * will be used for initialize the RMQ connection
  */
 public class RMQConnectionConfig implements Serializable {
@@ -59,6 +61,8 @@ public class RMQConnectionConfig implements Serializable {
 	private Integer requestedFrameMax;
 	private Integer requestedHeartbeat;
 
+	private Integer prefetchCount;
+
 	/**
 	*
 	* @param host host name
@@ -78,7 +82,8 @@ public class RMQConnectionConfig implements Serializable {
 	private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password,
 								Integer networkRecoveryInterval, Boolean automaticRecovery,
 								Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
-								Integer requestedFrameMax, Integer requestedHeartbeat){
+								Integer requestedFrameMax, Integer requestedHeartbeat,
+								Integer prefetchCount){
 		Preconditions.checkNotNull(host, "host can not be null");
 		Preconditions.checkNotNull(port, "port can not be null");
 		Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
@@ -97,6 +102,7 @@ public class RMQConnectionConfig implements Serializable {
 		this.requestedChannelMax = requestedChannelMax;
 		this.requestedFrameMax = requestedFrameMax;
 		this.requestedHeartbeat = requestedHeartbeat;
+		this.prefetchCount = prefetchCount;
 	}
 
 	/**
@@ -113,7 +119,7 @@ public class RMQConnectionConfig implements Serializable {
 	*/
 	private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery,
 								Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
-								Integer requestedFrameMax, Integer requestedHeartbeat){
+								Integer requestedFrameMax, Integer requestedHeartbeat, Integer prefetchCount){
 		Preconditions.checkNotNull(uri, "Uri can not be null");
 		this.uri = uri;
 
@@ -124,6 +130,7 @@ public class RMQConnectionConfig implements Serializable {
 		this.requestedChannelMax = requestedChannelMax;
 		this.requestedFrameMax = requestedFrameMax;
 		this.requestedHeartbeat = requestedHeartbeat;
+		this.prefetchCount = prefetchCount;
 	}
 
 	/** @return the host to use for connections */
@@ -225,6 +232,14 @@ public class RMQConnectionConfig implements Serializable {
 	}
 
 	/**
+	 * Retrieve the the channel prefetch count.
+	 * @return an Optional of the prefetch count, if set, for the consumer channel
+	 */
+	public Optional<Integer> getPrefetchCount() {
+		return Optional.ofNullable(prefetchCount);
+	}
+
+	/**
 	 *
 	 * @return Connection Factory for RMQ
 	 * @throws URISyntaxException if Malformed URI has been passed
@@ -302,6 +317,9 @@ public class RMQConnectionConfig implements Serializable {
 		private Integer requestedFrameMax;
 		private Integer requestedHeartbeat;
 
+		// basicQos options for consumers
+		private Integer prefetchCount;
+
 		private String uri;
 
 		/**
@@ -435,25 +453,40 @@ public class RMQConnectionConfig implements Serializable {
 		}
 
 		/**
+		 * Enables setting basicQos for the consumer channel. Only applicable to the {@link RMQSource}. Set to 0
+		 * for unlimited, which is the default.
+		 *
+		 * @see <a href="https://www.rabbitmq.com/consumer-prefetch.html">Consumer Prefetch</a>
+		 * @see <a href="https://www.rabbitmq.com/confirms.html#channel-qos-prefetch">Channel Prefetch (QoS)</a>
+		 * @param prefetchCount the max number of messages to receive without acknowledgement.
+		 * @return the Builder
+		 */
+		public Builder setPrefetchCount(int prefetchCount) {
+			this.prefetchCount = prefetchCount;
+			return this;
+		}
+
+		/**
 		 * The Builder method.
 		 *
 		 * <p>If URI is NULL we use host, port, vHost, username, password combination
 		 * to initialize connection. using  {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
-		 * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}.
+		 * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)}.
 		 *
 		 * <p>Otherwise the URI will be used to initialize the client connection
-		 * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+		 * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)}
 		 * @return RMQConnectionConfig
 		 */
 		public RMQConnectionConfig build(){
 			if (this.uri != null) {
 				return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
 					this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
-					this.requestedFrameMax, this.requestedHeartbeat);
+					this.requestedFrameMax, this.requestedHeartbeat, this.prefetchCount);
 			} else {
 				return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
 					this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
-					this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+					this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat,
+					this.prefetchCount);
 			}
 		}
 	}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index bb9ccf8..9e5eedd 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -41,7 +41,6 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,7 +62,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 
 /**
  * Tests for the RMQSource. The source supports two operation modes.
@@ -350,6 +350,56 @@ public class RMQSourceTest {
 		Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
 	}
 
+	@Test
+	public void testSetPrefetchCount() throws Exception {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setHost("localhost")
+			.setPort(5000)
+			.setUserName("guest")
+			.setPassword("guest")
+			.setVirtualHost("/")
+			.setPrefetchCount(1000)
+			.build();
+		final Connection mockConnection = Mockito.mock(Connection.class);
+		Channel channel = Mockito.mock(Channel.class);
+		Mockito.when(mockConnection.createChannel()).thenReturn(channel);
+
+		RMQMockedRuntimeTestSource source = new RMQMockedRuntimeTestSource(connectionConfig) {
+			@Override
+			protected Connection setupConnection() throws Exception {
+				return mockConnection;
+			}
+		};
+
+		FunctionInitializationContext mockContext = getMockContext();
+		source.initializeState(mockContext);
+		source.open(new Configuration());
+
+		Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
+		Mockito.verify(channel, Mockito.times(1)).basicQos(1000, true);
+	}
+
+	@Test
+	public void testUnsetPrefetchCount() throws Exception {
+		final Connection mockConnection = Mockito.mock(Connection.class);
+		Channel channel = Mockito.mock(Channel.class);
+		Mockito.when(mockConnection.createChannel()).thenReturn(channel);
+
+		RMQMockedRuntimeTestSource source = new RMQMockedRuntimeTestSource() {
+			@Override
+			protected Connection setupConnection() throws Exception {
+				return mockConnection;
+			}
+		};
+
+		FunctionInitializationContext mockContext = getMockContext();
+		source.initializeState(mockContext);
+		source.open(new Configuration());
+
+		Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
+		Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt());
+	}
+
 	private static class ConstructorTestClass extends RMQSource<String> {
 
 		private ConnectionFactory factory;
@@ -470,7 +520,7 @@ public class RMQSourceTest {
 			consumer = Mockito.mock(QueueingConsumer.class);
 
 			// Mock for delivery
-			final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
+			final Delivery deliveryMock = Mockito.mock(Delivery.class);
 			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes(ConfigConstants.DEFAULT_CHARSET));
 
 			try {
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
index 9cfac92..59a148c 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -23,8 +23,11 @@ import org.junit.Test;
 import java.net.URISyntaxException;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link RMQConnectionConfig}.
@@ -69,4 +72,32 @@ public class RMQConnectionConfigTest {
 		ConnectionFactory factory = connectionConfig.getConnectionFactory();
 		assertEquals(5000, factory.getConnectionTimeout());
 	}
+
+	@Test
+	public void shouldSetOptionalPrefetchCount() {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setHost("localhost")
+			.setPort(5000)
+			.setUserName("guest")
+			.setPassword("guest")
+			.setVirtualHost("/")
+			.setPrefetchCount(500)
+			.build();
+		Optional<Integer> prefetch = connectionConfig.getPrefetchCount();
+		assertTrue(prefetch.isPresent());
+		assertEquals(500, (int) prefetch.get());
+	}
+
+	@Test
+	public void shouldReturnEmptyOptionalPrefetchCount() {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setHost("localhost")
+			.setPort(5000)
+			.setUserName("guest")
+			.setPassword("guest")
+			.setVirtualHost("/")
+			.build();
+		Optional<Integer> prefetch = connectionConfig.getPrefetchCount();
+		assertFalse(prefetch.isPresent());
+	}
 }