You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/29 18:24:32 UTC
[flink] 02/02: [FLINK-17529][connectors/rabbitmq] Upgrade
com.rabbitmq:amqp-client to 5.9.0
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4130b3bab13d2780e019ad1fa84f3ec0365f6ce0
Author: austin ce <au...@gmail.com>
AuthorDate: Wed Jul 29 11:36:41 2020 -0400
[FLINK-17529][connectors/rabbitmq] Upgrade com.rabbitmq:amqp-client to 5.9.0
---
flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
.../streaming/connectors/rabbitmq/Delivery.java | 67 ----------------------
.../connectors/rabbitmq/QueueingConsumer.java | 1 +
.../streaming/connectors/rabbitmq/RMQSource.java | 1 +
.../connectors/rabbitmq/RMQSourceTest.java | 1 +
5 files changed, 4 insertions(+), 68 deletions(-)
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index d6e4c3f..2770e2d 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
- <rabbitmq.version>4.2.0</rabbitmq.version>
+ <rabbitmq.version>5.9.0</rabbitmq.version>
</properties>
<dependencies>
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
deleted file mode 100644
index 4e3418a..0000000
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
-// - brought this class out of com.rabbitmq.client.QueueingConsumer
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Envelope;
-
-/**
- * Encapsulates an arbitrary message - simple "bean" holder structure.
- * TODO: replace this with `com.rabbitmq.client.Delivery` in RMQ v5.x
- */
-public class Delivery {
- private final Envelope envelope;
- private final AMQP.BasicProperties properties;
- private final byte[] body;
-
- public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
- this.envelope = envelope;
- this.properties = properties;
- this.body = body;
- }
-
- /**
- * Retrieve the message envelope.
- *
- * @return the message envelope
- */
- public Envelope getEnvelope() {
- return envelope;
- }
-
- /**
- * Retrieve the message properties.
- *
- * @return the message properties
- */
- public AMQP.BasicProperties getProperties() {
- return properties;
- }
-
- /**
- * Retrieve the message body.
- *
- * @return the message body
- */
- public byte[] getBody() {
- return body;
- }
-}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
index f99a120..7e92161 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
@@ -25,6 +25,7 @@ import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index cae3bc1..594741f 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.Preconditions;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Delivery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 9e5eedd..df229779 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -40,6 +40,7 @@ import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import org.junit.After;
import org.junit.Before;