You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/26 16:04:44 UTC

[2/9] flink git commit: [FLINK-7654] [rabbitmq] Update RabbitMQ Java client version to 4.2.0

[FLINK-7654] [rabbitmq] Update RabbitMQ Java client version to 4.2.0

This closes #4694.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd0841e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd0841e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd0841e7

Branch: refs/heads/master
Commit: bd0841e7b8febb7a78f119db3193e81b96fcd84b
Parents: 4ba3eec
Author: yew1eb <ye...@gmail.com>
Authored: Thu Sep 21 14:38:52 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Sep 26 18:04:07 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/rabbitmq.md                 |   6 +-
 .../flink-connector-rabbitmq/README.md          |   6 +-
 .../flink-connector-rabbitmq/pom.xml            |   2 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |  13 +-
 .../connectors/rabbitmq/RMQSource.java          |   4 +-
 .../connectors/rabbitmq/RMQSinkTest.java        | 133 +++++++++++++++++++
 .../connectors/rabbitmq/RMQSourceTest.java      |   3 +-
 .../connectors/rabbitmq/common/RMQSinkTest.java | 133 -------------------
 8 files changed, 152 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index d3360e4..5780892 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -26,16 +26,14 @@ under the License.
 # License of the RabbitMQ Connector
 
 Flink's RabbitMQ connector defines a Maven dependency on the
-"RabbitMQ AMQP Java Client", licensed under the
-[Mozilla Public License v1.1 (MPL 1.1)](https://www.mozilla.org/en-US/MPL/1.1/).
+"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
 
 Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
 nor packages binaries from the "RabbitMQ AMQP Java Client".
 
 Users that create and publish derivative work based on Flink's
 RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
-must be aware that this may be subject to conditions declared
-in the Mozilla Public License v1.1 (MPL 1.1).
+must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
 
 # RabbitMQ Connector
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/README.md
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/README.md b/flink-connectors/flink-connector-rabbitmq/README.md
index 4a0e783..de8d1d8 100644
--- a/flink-connectors/flink-connector-rabbitmq/README.md
+++ b/flink-connectors/flink-connector-rabbitmq/README.md
@@ -1,13 +1,11 @@
 # License of the Rabbit MQ Connector
 
 Flink's RabbitMQ connector defines a Maven dependency on the
-"RabbitMQ AMQP Java Client", licensed under the
-Mozilla Public License v1.1 (MPL 1.1).
+"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
 
 Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
 nor packages binaries from the "RabbitMQ AMQP Java Client".
 
 Users that create and publish derivative work based on Flink's
 RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
-must be aware that this may be subject to conditions declared
-in the Mozilla Public License v1.1 (MPL 1.1).
+must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index 38337f4..2bb3404 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<rabbitmq.version>3.3.1</rabbitmq.version>
+		<rabbitmq.version>4.2.0</rabbitmq.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 48675c5..b3c6f8c 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 /**
  * A Sink for publishing data into RabbitMQ.
@@ -117,15 +118,19 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 
 	@Override
 	public void close() {
-		IOException t = null;
+		Exception t = null;
 		try {
-			channel.close();
-		} catch (IOException e) {
+			if (channel != null) {
+				channel.close();
+			}
+		} catch (IOException | TimeoutException e) {
 			t = e;
 		}
 
 		try {
-			connection.close();
+			if (connection != null) {
+				connection.close();
+			}
 		} catch (IOException e) {
 			if (t != null) {
 				LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 12a35f1..5018bcf 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -178,7 +178,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	public void close() throws Exception {
 		super.close();
 		try {
-			connection.close();
+			if (connection != null) {
+				connection.close();
+			}
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing RMQ connection with " + queueName
 				+ " at " + rmqConnectionConfig.getHost(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
new file mode 100644
index 0000000..93f884b
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RMQSink}.
+ */
+public class RMQSinkTest {
+
+	private static final String QUEUE_NAME = "queue";
+	private static final String MESSAGE_STR = "msg";
+	private static final byte[] MESSAGE = new byte[1];
+
+	private RMQConnectionConfig rmqConnectionConfig;
+	private ConnectionFactory connectionFactory;
+	private Connection connection;
+	private Channel channel;
+	private SerializationSchema<String> serializationSchema;
+
+	@Before
+	public void before() throws Exception {
+		serializationSchema = spy(new DummySerializationSchema());
+		rmqConnectionConfig = mock(RMQConnectionConfig.class);
+		connectionFactory = mock(ConnectionFactory.class);
+		connection = mock(Connection.class);
+		channel = mock(Channel.class);
+
+		when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+		when(connectionFactory.newConnection()).thenReturn(connection);
+		when(connection.createChannel()).thenReturn(channel);
+	}
+
+	@Test
+	public void openCallDeclaresQueue() throws Exception {
+		createRMQSink();
+
+		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
+	}
+
+	@Test
+	public void throwExceptionIfChannelIsNull() throws Exception {
+		when(connection.createChannel()).thenReturn(null);
+		try {
+			createRMQSink();
+		} catch (RuntimeException ex) {
+			assertEquals("None of RabbitMQ channels are available", ex.getMessage());
+		}
+	}
+
+	private RMQSink<String> createRMQSink() throws Exception {
+		RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+		rmqSink.open(new Configuration());
+		return rmqSink;
+	}
+
+	@Test
+	public void invokePublishBytesToQueue() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+		verify(serializationSchema).serialize(MESSAGE_STR);
+		verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void exceptionDuringPublishingIsNotIgnored() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+
+		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+	}
+
+	@Test
+	public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+		rmqSink.setLogFailuresOnly(true);
+
+		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+	}
+
+	@Test
+	public void closeAllResources() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+
+		rmqSink.close();
+
+		verify(channel).close();
+		verify(connection).close();
+	}
+
+	private class DummySerializationSchema implements SerializationSchema<String> {
+		@Override
+		public byte[] serialize(String element) {
+			return MESSAGE;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index f180e78..0996355 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -53,6 +53,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -404,7 +405,7 @@ public class RMQSourceTest {
 			try {
 				Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
 				Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
-			} catch (IOException e) {
+			} catch (IOException | TimeoutException e) {
 				fail("Test environment couldn't be created.");
 			}
 			return connectionFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/bd0841e7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
deleted file mode 100644
index 4fb6097..0000000
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq.common;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the {@link RMQSink}.
- */
-public class RMQSinkTest {
-
-	private static final String QUEUE_NAME = "queue";
-	private static final String MESSAGE_STR = "msg";
-	private static final byte[] MESSAGE = new byte[1];
-
-	private RMQConnectionConfig rmqConnectionConfig;
-	private ConnectionFactory connectionFactory;
-	private Connection connection;
-	private Channel channel;
-	private SerializationSchema<String> serializationSchema;
-
-	@Before
-	public void before() throws Exception {
-		serializationSchema = spy(new DummySerializationSchema());
-		rmqConnectionConfig = mock(RMQConnectionConfig.class);
-		connectionFactory = mock(ConnectionFactory.class);
-		connection = mock(Connection.class);
-		channel = mock(Channel.class);
-
-		when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
-		when(connectionFactory.newConnection()).thenReturn(connection);
-		when(connection.createChannel()).thenReturn(channel);
-	}
-
-	@Test
-	public void openCallDeclaresQueue() throws Exception {
-		createRMQSink();
-
-		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
-	}
-
-	@Test
-	public void throwExceptionIfChannelIsNull() throws Exception {
-		when(connection.createChannel()).thenReturn(null);
-		try {
-			createRMQSink();
-		} catch (RuntimeException ex) {
-			assertEquals("None of RabbitMQ channels are available", ex.getMessage());
-		}
-	}
-
-	private RMQSink<String> createRMQSink() throws Exception {
-		RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
-		rmqSink.open(new Configuration());
-		return rmqSink;
-	}
-
-	@Test
-	public void invokePublishBytesToQueue() throws Exception {
-		RMQSink<String> rmqSink = createRMQSink();
-
-		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
-		verify(serializationSchema).serialize(MESSAGE_STR);
-		verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void exceptionDuringPublishingIsNotIgnored() throws Exception {
-		RMQSink<String> rmqSink = createRMQSink();
-
-		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
-		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
-	}
-
-	@Test
-	public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
-		RMQSink<String> rmqSink = createRMQSink();
-		rmqSink.setLogFailuresOnly(true);
-
-		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
-		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
-	}
-
-	@Test
-	public void closeAllResources() throws Exception {
-		RMQSink<String> rmqSink = createRMQSink();
-
-		rmqSink.close();
-
-		verify(channel).close();
-		verify(connection).close();
-	}
-
-	private class DummySerializationSchema implements SerializationSchema<String> {
-		@Override
-		public byte[] serialize(String element) {
-			return MESSAGE;
-		}
-	}
-}