You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/27 14:40:46 UTC

[2/2] flink git commit: [FLINK-8468][RabbitMQ] Make RabbitMQ connector to take advantage of AMQP features (routing key, exchange and message properties)

[FLINK-8468][RabbitMQ] Make RabbitMQ connector to take advantage of AMQP features (routing key, exchange and message properties)

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

Fix test assertions and imports

Modified according to code reviews.

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

Update README.md

Update Readme.md

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32dea9a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32dea9a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32dea9a0

Branch: refs/heads/master
Commit: 32dea9a0b97faf82cf7dcc7e3411748544d6633e
Parents: 217c312
Author: Philippe Duveau <ph...@pmu.fr>
Authored: Sun Feb 4 01:15:25 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 27 15:04:44 2018 +0200

----------------------------------------------------------------------
 .../flink-connector-rabbitmq/README.md          |   4 +
 .../streaming/connectors/rabbitmq/RMQSink.java  |  78 +++++++++-
 .../rabbitmq/RMQSinkPublishOptions.java         |  72 +++++++++
 .../connectors/rabbitmq/RMQSinkTest.java        | 148 ++++++++++++++++++-
 4 files changed, 297 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32dea9a0/flink-connectors/flink-connector-rabbitmq/README.md
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/README.md b/flink-connectors/flink-connector-rabbitmq/README.md
index de8d1d8..b270d05 100644
--- a/flink-connectors/flink-connector-rabbitmq/README.md
+++ b/flink-connectors/flink-connector-rabbitmq/README.md
@@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client".
 Users that create and publish derivative work based on Flink's
 RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
 must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
+
+# This version provides a mechanism to handle AMQ Messaging features
+
+One of its Constructor uses an implemented interface object with five methods and an optionnal returned message handler. See RMQSinkFeatureTest class to get a sample of the methods to implement. The returned message handler is an implementation of the standard com.rabbitmq.client.ReturnListener interface. As this mechasnism uses RoutingKeys, queueName is null then the queue can not be declared to RabbitMQ during start. 

http://git-wip-us.apache.org/repos/asf/flink/blob/32dea9a0/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index c1118ed..0b447c1 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -25,6 +26,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.ReturnListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,15 +49,58 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	protected SerializationSchema<IN> schema;
 	private boolean logFailuresOnly = false;
 
+	private final RMQSinkPublishOptions<IN> publishOptions;
+	private final ReturnListener returnListener;
+
 	/**
 	 * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
 	 * @param queueName The queue to publish messages to.
 	 * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+	 * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
+	 * @param returnListener A ReturnListener implementation object to handle returned message event
      */
-	public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
+	private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema,
+			RMQSinkPublishOptions<IN> publishOptions, ReturnListener returnListener) {
 		this.rmqConnectionConfig = rmqConnectionConfig;
 		this.queueName = queueName;
 		this.schema = schema;
+		this.publishOptions = publishOptions;
+		this.returnListener = returnListener;
+	}
+
+	/**
+	 * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName The queue to publish messages to.
+	 * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+     */
+	@PublicEvolving
+	public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
+		this(rmqConnectionConfig, queueName, schema, null, null);
+	}
+
+	/**
+	 * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+	 * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
+	 * In this case the computeMandatoy or computeImmediate MUST return false otherwise an
+	 * IllegalStateException is raised during runtime.
+     */
+	@PublicEvolving
+	public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema,
+			RMQSinkPublishOptions<IN> publishOptions) {
+		this(rmqConnectionConfig, null, schema, publishOptions, null);
+	}
+
+	/**
+	 * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+	 * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
+	 * @param returnListener A ReturnListener implementation object to handle returned message event
+     */
+	@PublicEvolving
+	public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema,
+			RMQSinkPublishOptions<IN> publishOptions, ReturnListener returnListener) {
+		this(rmqConnectionConfig, null, schema, publishOptions, returnListener);
 	}
 
 	/**
@@ -64,7 +109,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	 * defining custom queue parameters)
 	 */
 	protected void setupQueue() throws IOException {
-		channel.queueDeclare(queueName, false, false, false, null);
+		if (queueName != null) {
+			channel.queueDeclare(queueName, false, false, false, null);
+		}
 	}
 
 	/**
@@ -89,6 +136,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 				throw new RuntimeException("None of RabbitMQ channels are available");
 			}
 			setupQueue();
+			if (returnListener != null) {
+				channel.addReturnListener(returnListener);
+			}
 		} catch (IOException e) {
 			throw new RuntimeException("Error while creating the channel", e);
 		}
@@ -105,7 +155,29 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		try {
 			byte[] msg = schema.serialize(value);
 
-			channel.basicPublish("", queueName, null, msg);
+			if (publishOptions == null) {
+				channel.basicPublish("", queueName, null, msg);
+			} else {
+				boolean mandatory = publishOptions.computeMandatory(value);
+				boolean immediate = publishOptions.computeImmediate(value);
+
+				if (returnListener == null && (mandatory || immediate)) {
+					throw new IllegalStateException("Setting mandatory and/or immediate flags to true requires a ReturnListener.");
+				} else {
+					String rk = publishOptions.computeRoutingKey(value);
+					if (rk == null) {
+						throw new NullPointerException("computeRoutingKey returned an anormal 'null' value.");
+					}
+					String exchange = publishOptions.computeExchange(value);
+					if (exchange == null) {
+						throw new NullPointerException("computeExchange returned an anormal 'null' value.");
+					}
+
+					channel.basicPublish(exchange, rk, mandatory, immediate,
+									publishOptions.computeProperties(value), msg);
+				}
+			}
+
 		} catch (IOException e) {
 			if (logFailuresOnly) {
 				LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/32dea9a0/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
new file mode 100644
index 0000000..62a0832
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * The message computation provides methods to compute the message routing key and/or the properties.
+ *
+ * @param <IN> The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RMQSinkPublishOptions<IN> extends java.io.Serializable {
+
+	/**
+	 * Compute the message's routing key from the data.
+	 * @param a The data used by the sink
+	 * @return The routing key of the message
+	 * null will raise a NullPointerException
+	 */
+	String computeRoutingKey(IN a);
+
+	/**
+	 * Compute the message's properties from the data.
+	 * @param a The data used by the sink
+	 * @return The message's properties (can be null)
+	 */
+	BasicProperties computeProperties(IN a);
+
+	/**
+	 * Compute the exchange from the data.
+	 * @param a The data used by the sink
+	 * @return The exchange to publish the message to
+	 * null will raise a NullPointerException
+	 */
+	String computeExchange(IN a);
+
+	/**
+	 * Compute the mandatory flag used in basic.publish method
+	 * See AMQP API help for values.
+	 * A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false)
+	 * @param a The data used by the sink
+	 * @return The mandatory flag
+	 */
+	boolean computeMandatory(IN a);
+
+	/**
+	 * Compute the immediate flag
+	 * See AMQP API help for values.
+	 * A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false)
+	 * @param a The data used by the sink
+	 * @return The mandatory flag
+	 */
+	boolean computeImmediate(IN a);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/32dea9a0/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
index 53b834d..011ffcc 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -22,17 +22,22 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.ReturnListener;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -43,14 +48,23 @@ import static org.mockito.Mockito.when;
 public class RMQSinkTest {
 
 	private static final String QUEUE_NAME = "queue";
+	private static final String EXCHANGE = "exchange";
+	private static final String ROUTING_KEY = "application.component.error";
+	private static final String EXPIRATION = "10000";
 	private static final String MESSAGE_STR = "msg";
 	private static final byte[] MESSAGE = new byte[1];
+	private static AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
+			.headers(Collections.singletonMap("Test", "My Value"))
+			.expiration(EXPIRATION)
+			.build();
 
 	private RMQConnectionConfig rmqConnectionConfig;
 	private ConnectionFactory connectionFactory;
 	private Connection connection;
 	private Channel channel;
 	private SerializationSchema<String> serializationSchema;
+	private DummyPublishOptions publishOptions;
+	private DummyReturnHandler returnListener;
 
 	@Before
 	public void before() throws Exception {
@@ -66,13 +80,20 @@ public class RMQSinkTest {
 	}
 
 	@Test
-	public void openCallDeclaresQueue() throws Exception {
+	public void openCallDeclaresQueueInStandardMode() throws Exception {
 		createRMQSink();
 
 		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
 	}
 
 	@Test
+	public void openCallDontDeclaresQueueInWithOptionsMode() throws Exception {
+		createRMQSinkWithOptions(false, false);
+
+		verify(channel, never()).queueDeclare(null, false, false, false, null);
+	}
+
+	@Test
 	public void throwExceptionIfChannelIsNull() throws Exception {
 		when(connection.createChannel()).thenReturn(null);
 		try {
@@ -83,7 +104,22 @@ public class RMQSinkTest {
 	}
 
 	private RMQSink<String> createRMQSink() throws Exception {
-		RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+		RMQSink<String> rmqSink = new RMQSink<>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+		rmqSink.open(new Configuration());
+		return rmqSink;
+	}
+
+	private RMQSink<String> createRMQSinkWithOptions(boolean mandatory, boolean immediate) throws Exception {
+		publishOptions = new DummyPublishOptions(mandatory, immediate);
+		RMQSink<String> rmqSink = new RMQSink<>(rmqConnectionConfig, serializationSchema, publishOptions);
+		rmqSink.open(new Configuration());
+		return rmqSink;
+	}
+
+	private RMQSink<String> createRMQSinkWithOptionsAndReturnHandler(boolean mandatory, boolean immediate) throws Exception {
+		publishOptions = new DummyPublishOptions(mandatory, immediate);
+		returnListener = new DummyReturnHandler();
+		RMQSink<String> rmqSink = new RMQSink<>(rmqConnectionConfig, serializationSchema, publishOptions, returnListener);
 		rmqSink.open(new Configuration());
 		return rmqSink;
 	}
@@ -124,7 +160,115 @@ public class RMQSinkTest {
 		verify(connection).close();
 	}
 
+	@Test
+	public void invokePublishBytesToQueueWithOptions() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptions(false, false);
+
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+		verify(serializationSchema).serialize(MESSAGE_STR);
+		verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false,
+				publishOptions.computeProperties(""), MESSAGE);
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void invokePublishBytesToQueueWithOptionsMandatory() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptions(true, false);
+
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void invokePublishBytesToQueueWithOptionsImmediate() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptions(false, true);
+
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+	}
+
+	@Test
+	public void invokePublishBytesToQueueWithOptionsMandatoryReturnHandler() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptionsAndReturnHandler(true, false);
+
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+		verify(serializationSchema).serialize(MESSAGE_STR);
+		verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, false,
+				publishOptions.computeProperties(""), MESSAGE);
+	}
+
+	@Test
+	public void invokePublishBytesToQueueWithOptionsImmediateReturnHandler() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptionsAndReturnHandler(false, true);
+
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+		verify(serializationSchema).serialize(MESSAGE_STR);
+		verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, true,
+				publishOptions.computeProperties(""), MESSAGE);
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void exceptionDuringWithOptionsPublishingIsNotIgnored() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptions(false, false);
+
+		doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false,
+				publishOptions.computeProperties(""), MESSAGE);
+		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+	}
+
+	@Test
+	public void exceptionDuringWithOptionsPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+		RMQSink<String> rmqSink = createRMQSinkWithOptions(false, false);
+		rmqSink.setLogFailuresOnly(true);
+
+		doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false,
+				publishOptions.computeProperties(""), MESSAGE);
+		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+	}
+
+	private class DummyPublishOptions implements RMQSinkPublishOptions<String> {
+		private static final long serialVersionUID = 1L;
+		private boolean mandatory = false;
+		private boolean immediate = false;
+
+		public DummyPublishOptions(boolean mandatory, boolean immediate) {
+			this.mandatory = mandatory;
+			this.immediate = immediate;
+		}
+
+		@Override
+		public String computeRoutingKey(String a) {
+			return ROUTING_KEY;
+		}
+
+		@Override
+		public BasicProperties computeProperties(String a) {
+			return props;
+		}
+
+		@Override
+		public String computeExchange(String a) {
+			return EXCHANGE;
+		}
+
+		@Override
+		public boolean computeMandatory(String a) {
+			return mandatory;
+		}
+
+		@Override
+		public boolean computeImmediate(String a) {
+			return immediate;
+		}
+	}
+
+	private class DummyReturnHandler implements ReturnListener {
+		@Override
+		public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
+				throws IOException {
+		}
+	}
+
 	private class DummySerializationSchema implements SerializationSchema<String> {
+		private static final long serialVersionUID = 1L;
+
 		@Override
 		public byte[] serialize(String element) {
 			return MESSAGE;