You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/29 18:24:32 UTC

[flink] 02/02: [FLINK-17529][connectors/rabbitmq] Upgrade com.rabbitmq:amqp-client to 5.9.0

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4130b3bab13d2780e019ad1fa84f3ec0365f6ce0
Author: austin ce <au...@gmail.com>
AuthorDate: Wed Jul 29 11:36:41 2020 -0400

    [FLINK-17529][connectors/rabbitmq] Upgrade com.rabbitmq:amqp-client to 5.9.0
---
 flink-connectors/flink-connector-rabbitmq/pom.xml  |  2 +-
 .../streaming/connectors/rabbitmq/Delivery.java    | 67 ----------------------
 .../connectors/rabbitmq/QueueingConsumer.java      |  1 +
 .../streaming/connectors/rabbitmq/RMQSource.java   |  1 +
 .../connectors/rabbitmq/RMQSourceTest.java         |  1 +
 5 files changed, 4 insertions(+), 68 deletions(-)

diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index d6e4c3f..2770e2d 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<rabbitmq.version>4.2.0</rabbitmq.version>
+		<rabbitmq.version>5.9.0</rabbitmq.version>
 	</properties>
 
 	<dependencies>
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
deleted file mode 100644
index 4e3418a..0000000
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/Delivery.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
-//	- brought this class out of com.rabbitmq.client.QueueingConsumer
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Envelope;
-
-/**
- * Encapsulates an arbitrary message - simple "bean" holder structure.
- * TODO: replace this with `com.rabbitmq.client.Delivery` in RMQ v5.x
- */
-public class Delivery {
-	private final Envelope envelope;
-	private final AMQP.BasicProperties properties;
-	private final byte[] body;
-
-	public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
-		this.envelope = envelope;
-		this.properties = properties;
-		this.body = body;
-	}
-
-	/**
-	 * Retrieve the message envelope.
-	 *
-	 * @return the message envelope
-	 */
-	public Envelope getEnvelope() {
-		return envelope;
-	}
-
-	/**
-	 * Retrieve the message properties.
-	 *
-	 * @return the message properties
-	 */
-	public AMQP.BasicProperties getProperties() {
-		return properties;
-	}
-
-	/**
-	 * Retrieve the message body.
-	 *
-	 * @return the message body
-	 */
-	public byte[] getBody() {
-		return body;
-	}
-}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
index f99a120..7e92161 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.java
@@ -25,6 +25,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.ConsumerCancelledException;
 import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.ShutdownSignalException;
 import com.rabbitmq.utility.Utility;
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index cae3bc1..594741f 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.Preconditions;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Delivery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 9e5eedd..df229779 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -40,6 +40,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Delivery;
 import com.rabbitmq.client.Envelope;
 import org.junit.After;
 import org.junit.Before;