You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/09 13:16:54 UTC
[2/2] flink git commit: [FLINK-3763] RabbitMQ Source/Sink standardize
connection parameters
[FLINK-3763] RabbitMQ Source/Sink standardize connection parameters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86a80336
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86a80336
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86a80336
Branch: refs/heads/master
Commit: 86a803366e1371ad5af539b74565e7b8ea102580
Parents: 776253c
Author: subhankar <su...@target.com>
Authored: Tue May 31 16:08:27 2016 +0530
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 15:16:32 2016 +0200
----------------------------------------------------------------------
docs/apis/streaming/connectors/rabbitmq.md | 28 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 55 ++-
.../connectors/rabbitmq/RMQSource.java | 91 +---
.../rabbitmq/common/RMQConnectionConfig.java | 448 +++++++++++++++++++
.../connectors/rabbitmq/RMQSourceTest.java | 31 +-
.../common/RMQConnectionConfigTest.java | 69 +++
6 files changed, 598 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/docs/apis/streaming/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/rabbitmq.md b/docs/apis/streaming/connectors/rabbitmq.md
index b48608d..df8cf80 100644
--- a/docs/apis/streaming/connectors/rabbitmq.md
+++ b/docs/apis/streaming/connectors/rabbitmq.md
@@ -47,7 +47,7 @@ A class which provides an interface for receiving data from RabbitMQ.
The followings have to be provided for the `RMQSource(\u2026)` constructor in order:
-- hostName: The RabbitMQ broker hostname.
+- RMQConnectionConfig.
- queueName: The RabbitMQ queue name.
- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`).
- deserializationSchema: Deserialization schema to turn messages into Java objects.
@@ -71,23 +71,29 @@ Example:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build();
DataStream<String> streamWithoutCorrelationIds = env
- .addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
+ .addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
.print
DataStream<String> streamWithCorrelationIds = env
- .addSource(new RMQSource<String>("localhost", "hello", true, new SimpleStringSchema()))
+ .addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
.print
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
+val connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build()
streamWithoutCorrelationIds = env
- .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))
+ .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
.print
streamWithCorrelationIds = env
- .addSource(new RMQSource[String]("localhost", "hello", true, new SimpleStringSchema))
+ .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
.print
{% endhighlight %}
</div>
@@ -98,7 +104,7 @@ A class providing an interface for sending data to RabbitMQ.
The followings have to be provided for the `RMQSink(\u2026)` constructor in order:
-1. The hostname
+1. RMQConnectionConfig
2. The queue name
3. Serialization schema
@@ -107,12 +113,18 @@ Example:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build();
+stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
+val connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build()
+stream.addSink(new RMQSink[String](connectionConfig, "hello", new StringToByteSerializer))
{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index ca18fc4..bf0cef7 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,38 +35,26 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
- private String QUEUE_NAME;
- private String HOST_NAME;
- private transient ConnectionFactory factory;
+ private String queueName;
+ private RMQConnectionConfig rmqConnectionConfig;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema<IN> schema;
- public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN> schema) {
- this.HOST_NAME = HOST_NAME;
- this.QUEUE_NAME = QUEUE_NAME;
- this.schema = schema;
- }
-
/**
- * Initializes the connection to RMQ.
- */
- public void initializeConnection() {
- factory = new ConnectionFactory();
- factory.setHost(HOST_NAME);
- try {
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+ * @param queueName The queue to publish messages to.
+ * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+ */
+ public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
+ this.rmqConnectionConfig = rmqConnectionConfig;
+ this.queueName = queueName;
+ this.schema = schema;
}
/**
* Called when new data arrives to the sink, and forwards it to RMQ.
- *
+ *
* @param value
* The incoming data
*/
@@ -74,11 +63,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
try {
byte[] msg = schema.serialize(value);
- channel.basicPublish("", QUEUE_NAME, null, msg);
+ channel.basicPublish("", queueName, null, msg);
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost());
}
}
@@ -92,15 +81,23 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
channel.close();
connection.close();
} catch (IOException e) {
- throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
- + " at " + HOST_NAME, e);
+ throw new RuntimeException("Error while closing RMQ connection with " + queueName
+ + " at " + rmqConnectionConfig.getHost(), e);
}
}
@Override
- public void open(Configuration config) {
- initializeConnection();
+ public void open(Configuration config) throws Exception {
+ ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
+ try {
+ connection = factory.newConnection();
+ channel = connection.createChannel();
+ channel.queueDeclare(queueName, false, false, false, null);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4f6a07f..8297f9c 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import com.rabbitmq.client.Channel;
@@ -66,17 +67,14 @@ import org.slf4j.LoggerFactory;
* @param <OUT> The type of the data read from RabbitMQ.
*/
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
- implements ResultTypeQueryable<OUT> {
+ implements ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
- private final String hostName;
- private final Integer port;
- private final String username;
- private final String password;
- protected final String queueName;
+ private final RMQConnectionConfig rmqConnectionConfig;
+ private final String queueName;
private final boolean usesCorrelationId;
protected DeserializationSchema<OUT> schema;
@@ -92,16 +90,16 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
* Creates a new RabbitMQ source with at-least-once message processing guarantee when
* checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
* For exactly-once, please use the constructor
- * {@link RMQSource#RMQSource(String, String, boolean usesCorrelationId, DeserializationSchema)},
+ * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean usesCorrelationId, DeserializationSchema)},
* set {@param usesCorrelationId} to true and enable checkpointing.
- * @param hostName The RabbiMQ broker's address to connect to.
+ * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to receive messages from.
* @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
* into Java objects.
*/
- public RMQSource(String hostName, String queueName,
- DeserializationSchema<OUT> deserializationSchema) {
- this(hostName, null, null, null, queueName, false, deserializationSchema);
+ public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName,
+ DeserializationSchema<OUT> deserializationSchema) {
+ this(rmqConnectionConfig, queueName, false, deserializationSchema);
}
/**
@@ -109,7 +107,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
* at the producer. The correlation id must be unique. Otherwise the behavior of the source is
* undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
* used, this source has at-least-once processing semantics when checkpointing is enabled.
- * @param hostName The RabbitMQ broker's address to connect to.
+ * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to receive messages from.
* @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
* id to deduplicate messages (in case of failed acknowledgments).
@@ -117,53 +115,10 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
* @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
* into Java objects.
*/
- public RMQSource(String hostName, String queueName, boolean usesCorrelationId,
- DeserializationSchema<OUT> deserializationSchema) {
- this(hostName, null, null, null, queueName, usesCorrelationId, deserializationSchema);
- }
-
- /**
- * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
- * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
- * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
- * used, this source has at-least-once processing semantics when checkpointing is enabled.
- * @param hostName The RabbitMQ broker's address to connect to.
- * @param port The RabbitMQ broker's port.
- * @param queueName The queue to receive messages from.
- * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
- * id to deduplicate messages (in case of failed acknowledgments).
- * Only used when checkpointing is enabled.
- * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
- * into Java objects.
- */
- public RMQSource(String hostName, Integer port,
- String queueName, boolean usesCorrelationId,
- DeserializationSchema<OUT> deserializationSchema) {
- this(hostName, port, null, null, queueName, usesCorrelationId, deserializationSchema);
- }
-
- /**
- * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
- * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
- * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
- * used, this source has at-least-once processing semantics when checkpointing is enabled.
- * @param hostName The RabbitMQ broker's address to connect to.
- * @param port The RabbitMQ broker's port.
- * @param queueName The queue to receive messages from.
- * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
- * id to deduplicate messages (in case of failed acknowledgments).
- * Only used when checkpointing is enabled.
- * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
- * into Java objects.
- */
- public RMQSource(String hostName, Integer port, String username, String password,
- String queueName, boolean usesCorrelationId,
- DeserializationSchema<OUT> deserializationSchema) {
+ public RMQSource(RMQConnectionConfig rmqConnectionConfig,
+ String queueName, boolean usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
super(String.class);
- this.hostName = hostName;
- this.port = port;
- this.username = username;
- this.password = password;
+ this.rmqConnectionConfig = rmqConnectionConfig;
this.queueName = queueName;
this.usesCorrelationId = usesCorrelationId;
this.schema = deserializationSchema;
@@ -173,8 +128,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
* Initializes the connection to RMQ with a default connection factory. The user may override
* this method to setup and configure their own ConnectionFactory.
*/
- protected ConnectionFactory setupConnectionFactory() {
- return new ConnectionFactory();
+ protected ConnectionFactory setupConnectionFactory() throws Exception {
+ return rmqConnectionConfig.getConnectionFactory();
}
/**
@@ -189,18 +144,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
/**
* Initializes the connection to RMQ.
*/
- private void initializeConnection() {
+ private void initializeConnection() throws Exception {
ConnectionFactory factory = setupConnectionFactory();
- factory.setHost(hostName);
- if (port != null) {
- factory.setPort(port);
- }
- if (username != null) {
- factory.setUsername(username);
- }
- if (password != null) {
- factory.setPassword(password);
- }
try {
connection = factory.newConnection();
channel = connection.createChannel();
@@ -222,7 +167,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
} catch (IOException e) {
throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
- + hostName, e);
+ + rmqConnectionConfig.getHost(), e);
}
}
@@ -240,7 +185,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
connection.close();
} catch (IOException e) {
throw new RuntimeException("Error while closing RMQ connection with " + queueName
- + " at " + hostName, e);
+ + " at " + rmqConnectionConfig.getHost(), e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
new file mode 100644
index 0000000..0ce7e79
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer,
+ * Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean,
+ * Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+ private String host;
+ private Integer port;
+ private String virtualHost;
+ private String username;
+ private String password;
+ private String uri;
+
+ private Integer networkRecoveryInterval;
+ private Boolean automaticRecovery;
+ private Boolean topologyRecovery;
+
+ private Integer connectionTimeout;
+ private Integer requestedChannelMax;
+ private Integer requestedFrameMax;
+ private Integer requestedHeartbeat;
+
+ /**
+ *
+ * @param host host name
+ * @param port port
+ * @param virtualHost virtual host
+ * @param username username
+ * @param password password
+ * @param networkRecoveryInterval connection recovery interval in milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if host or virtual host or username or password is null
+ */
+ private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password,
+ Integer networkRecoveryInterval, Boolean automaticRecovery,
+ Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+ Integer requestedFrameMax, Integer requestedHeartbeat){
+ Preconditions.checkNotNull(host, "host can not be null");
+ Preconditions.checkNotNull(port, "port can not be null");
+ Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
+ Preconditions.checkNotNull(username, "username can not be null");
+ Preconditions.checkNotNull(password, "password can not be null");
+ this.host = host;
+ this.port = port;
+ this.virtualHost = virtualHost;
+ this.username = username;
+ this.password = password;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @param uri the connection URI
+ * @param networkRecoveryInterval connection recovery interval in milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if URI is null
+ */
+ private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery,
+ Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+ Integer requestedFrameMax, Integer requestedHeartbeat){
+ Preconditions.checkNotNull(uri, "Uri can not be null");
+ this.uri = uri;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /** @return the host to use for connections */
+ public String getHost() {
+ return host;
+ }
+
+ /** @return the port to use for connections */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Retrieve the virtual host.
+ * @return the virtual host to use when connecting to the broker
+ */
+ public String getVirtualHost() {
+ return virtualHost;
+ }
+
+ /**
+ * Retrieve the user name.
+ * @return the AMQP user name to use when connecting to the broker
+ */
+ public String getUsername() {
+ return username;
+ }
+
+ /**
+ * Retrieve the password.
+ * @return the password to use when connecting to the broker
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Retrieve the URI.
+ * @return the connection URI when connecting to the broker
+ */
+ public String getUri() {
+ return uri;
+ }
+
+ /**
+ * Returns automatic connection recovery interval in milliseconds.
+ * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
+ */
+ public Integer getNetworkRecoveryInterval() {
+ return networkRecoveryInterval;
+ }
+
+ /**
+ * Returns true if automatic connection recovery is enabled, false otherwise
+ * @return true if automatic connection recovery is enabled, false otherwise
+ */
+ public Boolean isAutomaticRecovery() {
+ return automaticRecovery;
+ }
+
+ /**
+ * Returns true if topology recovery is enabled, false otherwise
+ * @return true if topology recovery is enabled, false otherwise
+ */
+ public Boolean isTopologyRecovery() {
+ return topologyRecovery;
+ }
+
+ /**
+ * Retrieve the connection timeout.
+ * @return the connection timeout, in milliseconds; zero for infinite
+ */
+ public Integer getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Retrieve the requested maximum channel number
+ * @return the initially requested maximum channel number; zero for unlimited
+ */
+ public Integer getRequestedChannelMax() {
+ return requestedChannelMax;
+ }
+
+ /**
+ * Retrieve the requested maximum frame size
+ * @return the initially requested maximum frame size, in octets; zero for unlimited
+ */
+ public Integer getRequestedFrameMax() {
+ return requestedFrameMax;
+ }
+
+ /**
+ * Retrieve the requested heartbeat interval.
+ * @return the initially requested heartbeat interval, in seconds; zero for none
+ */
+ public Integer getRequestedHeartbeat() {
+ return requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @return Connection Factory for RMQ
+ * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed
+ */
+ public ConnectionFactory getConnectionFactory() throws URISyntaxException,
+ NoSuchAlgorithmException, KeyManagementException {
+ ConnectionFactory factory = new ConnectionFactory();
+ if (this.uri != null && !this.uri.isEmpty()){
+ try {
+ factory.setUri(getUri());
+ }catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e){
+ LOG.error("Failed to parse uri {}", e.getMessage());
+ throw e;
+ }
+ } else {
+ factory.setHost(this.host);
+ factory.setPort(this.port);
+ factory.setVirtualHost(this.virtualHost);
+ factory.setUsername(this.username);
+ factory.setPassword(this.password);
+ }
+
+ if (this.automaticRecovery != null) {
+ factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
+ }
+ if (this.connectionTimeout != null) {
+ factory.setConnectionTimeout(this.connectionTimeout);
+ }
+ if (this.networkRecoveryInterval != null) {
+ factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
+ }
+ if (this.requestedHeartbeat != null) {
+ factory.setRequestedHeartbeat(this.requestedHeartbeat);
+ }
+ if (this.topologyRecovery != null) {
+ factory.setTopologyRecoveryEnabled(this.topologyRecovery);
+ }
+ if (this.requestedChannelMax != null) {
+ factory.setRequestedChannelMax(this.requestedChannelMax);
+ }
+ if (this.requestedFrameMax != null) {
+ factory.setRequestedFrameMax(this.requestedFrameMax);
+ }
+
+ return factory;
+ }
+
+ /**
+ * The Builder Class for {@link RMQConnectionConfig}
+ */
+ public static class Builder {
+
+ private String host;
+ private Integer port;
+ private String virtualHost;
+ private String username;
+ private String password;
+
+ private Integer networkRecoveryInterval;
+ private Boolean automaticRecovery;
+ private Boolean topologyRecovery;
+
+ private Integer connectionTimeout;
+ private Integer requestedChannelMax;
+ private Integer requestedFrameMax;
+ private Integer requestedHeartbeat;
+
+ private String uri;
+
+ /**
+ * Set the target port.
+ * @param port the default port to use for connections
+ * @return the Builder
+ */
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /** @param host the default host to use for connections
+ * @return the Builder
+ */
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ /**
+ * Set the virtual host.
+ * @param virtualHost the virtual host to use when connecting to the broker
+ * @return the Builder
+ */
+ public Builder setVirtualHost(String virtualHost) {
+ this.virtualHost = virtualHost;
+ return this;
+ }
+
+ /**
+ * Set the user name.
+ * @param username the AMQP user name to use when connecting to the broker
+ * @return the Builder
+ */
+ public Builder setUserName(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /**
+ * Set the password.
+ * @param password the password to use when connecting to the broker
+ * @return the Builder
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Convenience method for setting the fields in an AMQP URI: host,
+ * port, username, password and virtual host. If any part of the
+ * URI is ommited, the ConnectionFactory's corresponding variable
+ * is left unchanged.
+ * @param uri is the AMQP URI containing the data
+ * @return the Builder
+ */
+ public Builder setUri(String uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ /**
+ * Enables or disables topology recovery
+ * @param topologyRecovery if true, enables topology recovery
+ * @return the Builder
+ */
+ public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) {
+ this.topologyRecovery = topologyRecovery;
+ return this;
+ }
+
+ /**
+ * Set the requested heartbeat.
+ * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
+ * @return the Builder
+ */
+ public Builder setRequestedHeartbeat(int requestedHeartbeat) {
+ this.requestedHeartbeat = requestedHeartbeat;
+ return this;
+ }
+
+ /**
+ * Set the requested maximum frame size
+ * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
+ * @return the Builder
+ */
+ public Builder setRequestedFrameMax(int requestedFrameMax) {
+ this.requestedFrameMax = requestedFrameMax;
+ return this;
+ }
+
+ /**
+ * Set the requested maximum channel number
+ * @param requestedChannelMax initially requested maximum channel number; zero for unlimited
+ */
+ public Builder setRequestedChannelMax(int requestedChannelMax) {
+ this.requestedChannelMax = requestedChannelMax;
+ return this;
+ }
+
+ /**
+ * Sets connection recovery interval. Default is 5000.
+ * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
+ * @return the Builder
+ */
+ public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) {
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ return this;
+ }
+
+ /**
+ * Set the connection timeout.
+ * @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite
+ * @return the Builder
+ */
+ public Builder setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ /**
+ * Enables or disables automatic connection recovery
+ * @param automaticRecovery if true, enables connection recovery
+ * @return the Builder
+ */
+ public Builder setAutomaticRecovery(boolean automaticRecovery) {
+ this.automaticRecovery = automaticRecovery;
+ return this;
+ }
+
+ /**
+ * The Builder method
+ * If URI is NULL we use host, port, vHost, username, password combination
+ * to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
+ * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ *
+ * else URI will be used to initialize the client connection
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * @return RMQConnectionConfig
+ */
+ public RMQConnectionConfig build(){
+ if(this.uri != null) {
+ return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
+ this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
+ this.requestedFrameMax, this.requestedHeartbeat);
+ } else {
+ return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
+ this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
+ this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 21f185f..31128a9 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.SerializedCheckpointData;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.After;
import org.junit.Before;
@@ -220,11 +221,12 @@ public class RMQSourceTest {
* Tests whether constructor params are passed correctly.
*/
@Test
- public void testConstructorParams() {
+ public void testConstructorParams() throws Exception {
// verify construction params
+ RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder();
+ builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
ConstructorTestClass testObj = new ConstructorTestClass(
- "hostTest", 999, "userTest", "passTest",
- "queueTest", false, new StringDeserializationScheme());
+ builder.build(), "queueTest", false, new StringDeserializationScheme());
try {
testObj.open(new Configuration());
@@ -240,17 +242,16 @@ public class RMQSourceTest {
private static class ConstructorTestClass extends RMQSource<String> {
- private ConnectionFactory factory = Mockito.spy(new ConnectionFactory());
-
- public ConstructorTestClass(String hostName, Integer port,
- String username,
- String password,
- String queueName,
- boolean usesCorrelationId,
- DeserializationSchema<String> deserializationSchema) {
- super(hostName, port, username, password,
- queueName, usesCorrelationId, deserializationSchema);
+ private ConnectionFactory factory;
+ public ConstructorTestClass(RMQConnectionConfig rmqConnectionConfig,
+ String queueName,
+ boolean usesCorrelationId,
+ DeserializationSchema<String> deserializationSchema) throws Exception {
+ super(rmqConnectionConfig, queueName, usesCorrelationId, deserializationSchema);
+ RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder();
+ builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+ factory = Mockito.spy(builder.build().getConnectionFactory());
try {
Mockito.doThrow(new RuntimeException()).when(factory).newConnection();
} catch (IOException e) {
@@ -295,7 +296,9 @@ public class RMQSourceTest {
private class RMQTestSource extends RMQSource<String> {
public RMQTestSource() {
- super("hostDummy", -1, "", "", "queueDummy", true, new StringDeserializationScheme());
+ super(new RMQConnectionConfig.Builder().setHost("hostTest")
+ .setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
+ , "queueDummy", true, new StringDeserializationScheme());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
new file mode 100644
index 0000000..40985ce
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class RMQConnectionConfigTest {
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfHostIsNull() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setPort(1000).setUserName("guest")
+ .setPassword("guest").setVirtualHost("/").build();
+ connectionConfig.getConnectionFactory();
+ }
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost").setUserName("guest")
+ .setPassword("guest").setVirtualHost("/").build();
+ connectionConfig.getConnectionFactory();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost").setUserName("guest")
+ .setPassword("guest").setVirtualHost("/").build();
+ ConnectionFactory factory = connectionConfig.getConnectionFactory();
+ assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, factory.getConnectionTimeout());
+ }
+
+ @Test
+ public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost").setPort(5000).setUserName("guest")
+ .setPassword("guest").setVirtualHost("/")
+ .setConnectionTimeout(5000).build();
+ ConnectionFactory factory = connectionConfig.getConnectionFactory();
+ assertEquals(5000, factory.getConnectionTimeout());
+ }
+}