You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GJL <gi...@git.apache.org> on 2018/01/19 14:19:20 UTC
[GitHub] flink pull request #4979: RMQSource support disabling queue declaration
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/4979#discussion_r162632757
--- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
@@ -138,7 +138,9 @@ protected ConnectionFactory setupConnectionFactory() throws Exception {
* defining custom queue parameters)
*/
protected void setupQueue() throws IOException {
- channel.queueDeclare(queueName, true, false, false, null);
+ if (rmqConnectionConfig.isQueueDeclaration()) {
--- End diff --
Thanks for your contribution to Apache Flink @sihuazhou! I have reviewed your code, and I am not sure if the additional flag is needed. The original author of the `RMQSource` declared this method protected, which means that if you do not want the queue to be declared, you can simply override the method with an empty implementation. For example:
```
env.addSource(new RMQSource<String>(
connectionConfig,
"queueName",
true,
new SimpleStringSchema()) {
@Override
protected void setupQueue() {
// do not declare queue
}
});
```
This intent is also reflected in the Javadoc:
```
/**
* Sets up the queue. The default implementation just declares the queue. The user may override
* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
* defining custom queue parameters)
*/
```
Moreover, `RMQSink#setupQueue` also declares the queue by default, which is not addressed in your pull request. Please let me know what you think @sihuazhou
cc: @tzulitai @zentol
---