You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/09 13:16:54 UTC

[2/2] flink git commit: [FLINK-3763] RabbitMQ Source/Sink standardize connection parameters

[FLINK-3763] RabbitMQ Source/Sink standardize connection parameters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86a80336
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86a80336
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86a80336

Branch: refs/heads/master
Commit: 86a803366e1371ad5af539b74565e7b8ea102580
Parents: 776253c
Author: subhankar <su...@target.com>
Authored: Tue May 31 16:08:27 2016 +0530
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 15:16:32 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/rabbitmq.md      |  28 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |  55 ++-
 .../connectors/rabbitmq/RMQSource.java          |  91 +---
 .../rabbitmq/common/RMQConnectionConfig.java    | 448 +++++++++++++++++++
 .../connectors/rabbitmq/RMQSourceTest.java      |  31 +-
 .../common/RMQConnectionConfigTest.java         |  69 +++
 6 files changed, 598 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/docs/apis/streaming/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/rabbitmq.md b/docs/apis/streaming/connectors/rabbitmq.md
index b48608d..df8cf80 100644
--- a/docs/apis/streaming/connectors/rabbitmq.md
+++ b/docs/apis/streaming/connectors/rabbitmq.md
@@ -47,7 +47,7 @@ A class which provides an interface for receiving data from RabbitMQ.
 
 The followings have to be provided for the `RMQSource(\u2026)` constructor in order:
 
-- hostName: The RabbitMQ broker hostname.
+- RMQConnectionConfig.
 - queueName: The RabbitMQ queue name.
 - usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`).
 - deserializationSchema: Deserialization schema to turn messages into Java objects.
@@ -71,23 +71,29 @@ Example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build();
 DataStream<String> streamWithoutCorrelationIds = env
-	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
+	.addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
 	.print
 
 DataStream<String> streamWithCorrelationIds = env
-	.addSource(new RMQSource<String>("localhost", "hello", true, new SimpleStringSchema()))
+	.addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
 	.print
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build()
 streamWithoutCorrelationIds = env
-    .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))
+    .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
     .print
 
 streamWithCorrelationIds = env
-    .addSource(new RMQSource[String]("localhost", "hello", true, new SimpleStringSchema))
+    .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
     .print
 {% endhighlight %}
 </div>
@@ -98,7 +104,7 @@ A class providing an interface for sending data to RabbitMQ.
 
 The followings have to be provided for the `RMQSink(\u2026)` constructor in order:
 
-1. The hostname
+1. RMQConnectionConfig
 2. The queue name
 3. Serialization schema
 
@@ -107,12 +113,18 @@ Example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build();
+stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
+val connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build()
+stream.addSink(new RMQSink[String](connectionConfig, "hello", new StringToByteSerializer))
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index ca18fc4..bf0cef7 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,38 +35,26 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
 
-	private String QUEUE_NAME;
-	private String HOST_NAME;
-	private transient ConnectionFactory factory;
+	private String queueName;
+	private RMQConnectionConfig rmqConnectionConfig;
 	private transient Connection connection;
 	private transient Channel channel;
 	private SerializationSchema<IN> schema;
 
-	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN> schema) {
-		this.HOST_NAME = HOST_NAME;
-		this.QUEUE_NAME = QUEUE_NAME;
-		this.schema = schema;
-	}
-
 	/**
-	 * Initializes the connection to RMQ.
-	 */
-	public void initializeConnection() {
-		factory = new ConnectionFactory();
-		factory.setHost(HOST_NAME);
-		try {
-			connection = factory.newConnection();
-			channel = connection.createChannel();
-			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName The queue to publish messages to.
+	 * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
+     */
+	public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
+		this.rmqConnectionConfig = rmqConnectionConfig;
+		this.queueName = queueName;
+		this.schema = schema;
 	}
 
 	/**
 	 * Called when new data arrives to the sink, and forwards it to RMQ.
-	 * 
+	 *
 	 * @param value
 	 *            The incoming data
 	 */
@@ -74,11 +63,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		try {
 			byte[] msg = schema.serialize(value);
 
-			channel.basicPublish("", QUEUE_NAME, null, msg);
+			channel.basicPublish("", queueName, null, msg);
 
 		} catch (IOException e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+				LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost());
 			}
 		}
 
@@ -92,15 +81,23 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 			channel.close();
 			connection.close();
 		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
-					+ " at " + HOST_NAME, e);
+			throw new RuntimeException("Error while closing RMQ connection with " + queueName
+				+ " at " + rmqConnectionConfig.getHost(), e);
 		}
 
 	}
 
 	@Override
-	public void open(Configuration config) {
-		initializeConnection();
+	public void open(Configuration config) throws Exception {
+		ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			channel.queueDeclare(queueName, false, false, false, null);
+
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4f6a07f..8297f9c 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 import com.rabbitmq.client.Channel;
@@ -66,17 +67,14 @@ import org.slf4j.LoggerFactory;
  * @param <OUT> The type of the data read from RabbitMQ.
  */
 public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
-		implements ResultTypeQueryable<OUT> {
+	implements ResultTypeQueryable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
 
-	private final String hostName;
-	private final Integer port;
-	private final String username;
-	private final String password;
-	protected final String queueName;
+	private final RMQConnectionConfig rmqConnectionConfig;
+	private final String queueName;
 	private final boolean usesCorrelationId;
 	protected DeserializationSchema<OUT> schema;
 
@@ -92,16 +90,16 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * Creates a new RabbitMQ source with at-least-once message processing guarantee when
 	 * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
 	 * For exactly-once, please use the constructor
-	 * {@link RMQSource#RMQSource(String, String, boolean usesCorrelationId, DeserializationSchema)},
+	 * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean usesCorrelationId, DeserializationSchema)},
 	 * set {@param usesCorrelationId} to true and enable checkpointing.
-	 * @param hostName The RabbiMQ broker's address to connect to.
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
 	 * @param queueName  The queue to receive messages from.
 	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
 	 *               				into Java objects.
 	 */
-	public RMQSource(String hostName, String queueName,
-				DeserializationSchema<OUT> deserializationSchema) {
-		this(hostName, null, null, null, queueName, false, deserializationSchema);
+	public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName,
+					DeserializationSchema<OUT> deserializationSchema) {
+		this(rmqConnectionConfig, queueName, false, deserializationSchema);
 	}
 
 	/**
@@ -109,7 +107,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
 	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
 	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
-	 * @param hostName The RabbitMQ broker's address to connect to.
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
 	 * @param queueName The queue to receive messages from.
 	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
 	 *                          id to deduplicate messages (in case of failed acknowledgments).
@@ -117,53 +115,10 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
 	 *                              into Java objects.
 	 */
-	public RMQSource(String hostName, String queueName, boolean usesCorrelationId,
-				DeserializationSchema<OUT> deserializationSchema) {
-		this(hostName, null, null, null, queueName, usesCorrelationId, deserializationSchema);
-	}
-
-	/**
-	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
-	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
-	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
-	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
-	 * @param hostName The RabbitMQ broker's address to connect to.
-	 * @param port The RabbitMQ broker's port.
-	 * @param queueName The queue to receive messages from.
-	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
-	 *                          id to deduplicate messages (in case of failed acknowledgments).
-	 *                          Only used when checkpointing is enabled.
-	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
-	 *                              into Java objects.
-	 */
-	public RMQSource(String hostName, Integer port,
-				String queueName, boolean usesCorrelationId,
-				DeserializationSchema<OUT> deserializationSchema) {
-		this(hostName, port, null, null, queueName, usesCorrelationId, deserializationSchema);
-	}
-
-	/**
-	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
-	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
-	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not
-	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
-	 * @param hostName The RabbitMQ broker's address to connect to.
-	 * @param port The RabbitMQ broker's port.
-	 * @param queueName The queue to receive messages from.
-	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
-	 *                          id to deduplicate messages (in case of failed acknowledgments).
-	 *                          Only used when checkpointing is enabled.
-	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
-	 *                              into Java objects.
-	 */
-	public RMQSource(String hostName, Integer port, String username, String password,
-				String queueName, boolean usesCorrelationId,
-				DeserializationSchema<OUT> deserializationSchema) {
+	public RMQSource(RMQConnectionConfig rmqConnectionConfig,
+					String queueName, boolean usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
 		super(String.class);
-		this.hostName = hostName;
-		this.port = port;
-		this.username = username;
-		this.password = password;
+		this.rmqConnectionConfig = rmqConnectionConfig;
 		this.queueName = queueName;
 		this.usesCorrelationId = usesCorrelationId;
 		this.schema = deserializationSchema;
@@ -173,8 +128,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * Initializes the connection to RMQ with a default connection factory. The user may override
 	 * this method to setup and configure their own ConnectionFactory.
 	 */
-	protected ConnectionFactory setupConnectionFactory() {
-		return new ConnectionFactory();
+	protected ConnectionFactory setupConnectionFactory() throws Exception {
+		return rmqConnectionConfig.getConnectionFactory();
 	}
 
 	/**
@@ -189,18 +144,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	/**
 	 * Initializes the connection to RMQ.
 	 */
-	private void initializeConnection() {
+	private void initializeConnection() throws Exception {
 		ConnectionFactory factory = setupConnectionFactory();
-		factory.setHost(hostName);
-		if (port != null) {
-			factory.setPort(port);
-		}
-		if (username != null) {
-			factory.setUsername(username);
-		}
-		if (password != null) {
-			factory.setPassword(password);
-		}
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();
@@ -222,7 +167,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 
 		} catch (IOException e) {
 			throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
-					+ hostName, e);
+				+ rmqConnectionConfig.getHost(), e);
 		}
 	}
 
@@ -240,7 +185,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 			connection.close();
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing RMQ connection with " + queueName
-					+ " at " + hostName, e);
+				+ " at " + rmqConnectionConfig.getHost(), e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
new file mode 100644
index 0000000..0ce7e79
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer,
+ * Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean,
+ * Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+	private String host;
+	private Integer port;
+	private String virtualHost;
+	private String username;
+	private String password;
+	private String uri;
+
+	private Integer networkRecoveryInterval;
+	private Boolean automaticRecovery;
+	private Boolean topologyRecovery;
+
+	private Integer connectionTimeout;
+	private Integer requestedChannelMax;
+	private Integer requestedFrameMax;
+	private Integer requestedHeartbeat;
+
+	/**
+	*
+	* @param host host name
+	* @param port port
+	* @param virtualHost virtual host
+	* @param username username
+	* @param password password
+	* @param networkRecoveryInterval connection recovery interval in milliseconds
+	* @param automaticRecovery if automatic connection recovery
+	* @param topologyRecovery if topology recovery
+	* @param connectionTimeout connection timeout
+	* @param requestedChannelMax requested maximum channel number
+	* @param requestedFrameMax requested maximum frame size
+	* @param requestedHeartbeat requested heartbeat interval
+	* @throws NullPointerException if host or virtual host or username or password is null
+    */
+	private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password,
+								Integer networkRecoveryInterval, Boolean automaticRecovery,
+								Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+								Integer requestedFrameMax, Integer requestedHeartbeat){
+		Preconditions.checkNotNull(host, "host can not be null");
+		Preconditions.checkNotNull(port, "port can not be null");
+		Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
+		Preconditions.checkNotNull(username, "username can not be null");
+		Preconditions.checkNotNull(password, "password can not be null");
+		this.host = host;
+		this.port = port;
+		this.virtualHost = virtualHost;
+		this.username = username;
+		this.password = password;
+
+		this.networkRecoveryInterval = networkRecoveryInterval;
+		this.automaticRecovery = automaticRecovery;
+		this.topologyRecovery = topologyRecovery;
+		this.connectionTimeout = connectionTimeout;
+		this.requestedChannelMax = requestedChannelMax;
+		this.requestedFrameMax = requestedFrameMax;
+		this.requestedHeartbeat = requestedHeartbeat;
+	}
+
+	/**
+	*
+	* @param uri the connection URI
+	* @param networkRecoveryInterval connection recovery interval in milliseconds
+	* @param automaticRecovery if automatic connection recovery
+	* @param topologyRecovery if topology recovery
+	* @param connectionTimeout connection timeout
+	* @param requestedChannelMax requested maximum channel number
+	* @param requestedFrameMax requested maximum frame size
+	* @param requestedHeartbeat requested heartbeat interval
+	* @throws NullPointerException if URI is null
+	*/
+	private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery,
+								Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+								Integer requestedFrameMax, Integer requestedHeartbeat){
+		Preconditions.checkNotNull(uri, "Uri can not be null");
+		this.uri = uri;
+
+		this.networkRecoveryInterval = networkRecoveryInterval;
+		this.automaticRecovery = automaticRecovery;
+		this.topologyRecovery = topologyRecovery;
+		this.connectionTimeout = connectionTimeout;
+		this.requestedChannelMax = requestedChannelMax;
+		this.requestedFrameMax = requestedFrameMax;
+		this.requestedHeartbeat = requestedHeartbeat;
+	}
+
+	/** @return the host to use for connections */
+	public String getHost() {
+		return host;
+	}
+
+	/** @return the port to use for connections */
+	public int getPort() {
+		return port;
+	}
+
+	/**
+	 * Retrieve the virtual host.
+	 * @return the virtual host to use when connecting to the broker
+	 */
+	public String getVirtualHost() {
+		return virtualHost;
+	}
+
+	/**
+	 * Retrieve the user name.
+	 * @return the AMQP user name to use when connecting to the broker
+	 */
+	public String getUsername() {
+		return username;
+	}
+
+	/**
+	 * Retrieve the password.
+	 * @return the password to use when connecting to the broker
+	 */
+	public String getPassword() {
+		return password;
+	}
+
+	/**
+	 * Retrieve the URI.
+	 * @return the connection URI when connecting to the broker
+     */
+	public String getUri() {
+		return uri;
+	}
+
+	/**
+	 * Returns automatic connection recovery interval in milliseconds.
+	 * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
+	 */
+	public Integer getNetworkRecoveryInterval() {
+		return networkRecoveryInterval;
+	}
+
+	/**
+	 * Returns true if automatic connection recovery is enabled, false otherwise
+	 * @return true if automatic connection recovery is enabled, false otherwise
+	 */
+	public Boolean isAutomaticRecovery() {
+		return automaticRecovery;
+	}
+
+	/**
+	 * Returns true if topology recovery is enabled, false otherwise
+	 * @return true if topology recovery is enabled, false otherwise
+	 */
+	public Boolean isTopologyRecovery() {
+		return topologyRecovery;
+	}
+
+	/**
+	 * Retrieve the connection timeout.
+	 * @return the connection timeout, in milliseconds; zero for infinite
+	 */
+	public Integer getConnectionTimeout() {
+		return connectionTimeout;
+	}
+
+	/**
+	 * Retrieve the requested maximum channel number
+	 * @return the initially requested maximum channel number; zero for unlimited
+	 */
+	public Integer getRequestedChannelMax() {
+		return requestedChannelMax;
+	}
+
+	/**
+	 * Retrieve the requested maximum frame size
+	 * @return the initially requested maximum frame size, in octets; zero for unlimited
+	 */
+	public Integer getRequestedFrameMax() {
+		return requestedFrameMax;
+	}
+
+	/**
+	 * Retrieve the requested heartbeat interval.
+	 * @return the initially requested heartbeat interval, in seconds; zero for none
+	 */
+	public Integer getRequestedHeartbeat() {
+		return requestedHeartbeat;
+	}
+
+	/**
+	 *
+	 * @return Connection Factory for RMQ
+	 * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed
+     */
+	public ConnectionFactory getConnectionFactory() throws URISyntaxException,
+		NoSuchAlgorithmException, KeyManagementException {
+		ConnectionFactory factory = new ConnectionFactory();
+		if (this.uri != null && !this.uri.isEmpty()){
+			try {
+				factory.setUri(getUri());
+			}catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e){
+				LOG.error("Failed to parse uri {}", e.getMessage());
+				throw e;
+			}
+		} else {
+			factory.setHost(this.host);
+			factory.setPort(this.port);
+			factory.setVirtualHost(this.virtualHost);
+			factory.setUsername(this.username);
+			factory.setPassword(this.password);
+		}
+
+		if (this.automaticRecovery != null) {
+			factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
+		}
+		if (this.connectionTimeout != null) {
+			factory.setConnectionTimeout(this.connectionTimeout);
+		}
+		if (this.networkRecoveryInterval != null) {
+			factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
+		}
+		if (this.requestedHeartbeat != null) {
+			factory.setRequestedHeartbeat(this.requestedHeartbeat);
+		}
+		if (this.topologyRecovery != null) {
+			factory.setTopologyRecoveryEnabled(this.topologyRecovery);
+		}
+		if (this.requestedChannelMax != null) {
+			factory.setRequestedChannelMax(this.requestedChannelMax);
+		}
+		if (this.requestedFrameMax != null) {
+			factory.setRequestedFrameMax(this.requestedFrameMax);
+		}
+
+		return factory;
+	}
+
+	/**
+	 * The Builder Class for {@link RMQConnectionConfig}
+	 */
+	public static class Builder {
+
+		private String host;
+		private Integer port;
+		private String virtualHost;
+		private String username;
+		private String password;
+
+		private Integer networkRecoveryInterval;
+		private Boolean automaticRecovery;
+		private Boolean topologyRecovery;
+
+		private Integer connectionTimeout;
+		private Integer requestedChannelMax;
+		private Integer requestedFrameMax;
+		private Integer requestedHeartbeat;
+
+		private String uri;
+
+		/**
+		 * Set the target port.
+		 * @param port the default port to use for connections
+		 * @return the Builder
+		 */
+		public Builder setPort(int port) {
+			this.port = port;
+			return this;
+		}
+
+		/** @param host the default host to use for connections
+		 * @return the Builder
+		 */
+		public Builder setHost(String host) {
+			this.host = host;
+			return this;
+		}
+
+		/**
+		 * Set the virtual host.
+		 * @param virtualHost the virtual host to use when connecting to the broker
+		 * @return the Builder
+		 */
+		public Builder setVirtualHost(String virtualHost) {
+			this.virtualHost = virtualHost;
+			return this;
+		}
+
+		/**
+		 * Set the user name.
+		 * @param username the AMQP user name to use when connecting to the broker
+		 * @return the Builder
+		 */
+		public Builder setUserName(String username) {
+			this.username = username;
+			return this;
+		}
+
+		/**
+		 * Set the password.
+		 * @param password the password to use when connecting to the broker
+		 * @return the Builder
+		 */
+		public Builder setPassword(String password) {
+			this.password = password;
+			return this;
+		}
+
+		/**
+		 * Convenience method for setting the fields in an AMQP URI: host,
+		 * port, username, password and virtual host.  If any part of the
+		 * URI is ommited, the ConnectionFactory's corresponding variable
+		 * is left unchanged.
+		 * @param uri is the AMQP URI containing the data
+		 * @return the Builder
+		 */
+		public Builder setUri(String uri) {
+			this.uri = uri;
+			return this;
+		}
+
+		/**
+		 * Enables or disables topology recovery
+		 * @param topologyRecovery if true, enables topology recovery
+		 * @return the Builder
+		 */
+		public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) {
+			this.topologyRecovery = topologyRecovery;
+			return this;
+		}
+
+		/**
+		 * Set the requested heartbeat.
+		 * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
+		 * @return the Builder
+		 */
+		public Builder setRequestedHeartbeat(int requestedHeartbeat) {
+			this.requestedHeartbeat = requestedHeartbeat;
+			return this;
+		}
+
+		/**
+		 * Set the requested maximum frame size
+		 * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
+		 * @return the Builder
+		 */
+		public Builder setRequestedFrameMax(int requestedFrameMax) {
+			this.requestedFrameMax = requestedFrameMax;
+			return this;
+		}
+
+		/**
+		 * Set the requested maximum channel number
+		 * @param requestedChannelMax initially requested maximum channel number; zero for unlimited
+		 */
+		public Builder setRequestedChannelMax(int requestedChannelMax) {
+			this.requestedChannelMax = requestedChannelMax;
+			return this;
+		}
+
+		/**
+		 * Sets connection recovery interval. Default is 5000.
+		 * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
+		 * @return the Builder
+		 */
+		public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) {
+			this.networkRecoveryInterval = networkRecoveryInterval;
+			return this;
+		}
+
+		/**
+		 * Set the connection timeout.
+		 * @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite
+		 * @return the Builder
+		 */
+		public Builder setConnectionTimeout(int connectionTimeout) {
+			this.connectionTimeout = connectionTimeout;
+			return this;
+		}
+
+		/**
+		 * Enables or disables automatic connection recovery
+		 * @param automaticRecovery if true, enables connection recovery
+		 * @return the Builder
+		 */
+		public Builder setAutomaticRecovery(boolean automaticRecovery) {
+			this.automaticRecovery = automaticRecovery;
+			return this;
+		}
+
+		/**
+		 * The Builder method
+		 * If URI is NULL we use host, port, vHost, username, password combination
+		 * to initialize connection. using  {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
+		 * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+		 *
+		 * else URI will be used to initialize the client connection
+		 * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+		 * @return RMQConnectionConfig
+         */
+		public RMQConnectionConfig build(){
+			if(this.uri != null) {
+				return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
+					this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
+					this.requestedFrameMax, this.requestedHeartbeat);
+			} else {
+				return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
+					this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
+					this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 21f185f..31128a9 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.SerializedCheckpointData;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
 import org.junit.Before;
@@ -220,11 +221,12 @@ public class RMQSourceTest {
 	 * Tests whether constructor params are passed correctly.
 	 */
 	@Test
-	public void testConstructorParams() {
+	public void testConstructorParams() throws Exception {
 		// verify construction params
+		RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder();
+		builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
 		ConstructorTestClass testObj = new ConstructorTestClass(
-			"hostTest", 999, "userTest", "passTest",
-			"queueTest", false, new StringDeserializationScheme());
+			builder.build(), "queueTest", false, new StringDeserializationScheme());
 
 		try {
 			testObj.open(new Configuration());
@@ -240,17 +242,16 @@ public class RMQSourceTest {
 
 	private static class ConstructorTestClass extends RMQSource<String> {
 
-		private ConnectionFactory factory = Mockito.spy(new ConnectionFactory());
-
-		public ConstructorTestClass(String hostName, Integer port,
-				String username,
-				String password,
-				String queueName,
-				boolean usesCorrelationId,
-				DeserializationSchema<String> deserializationSchema) {
-			super(hostName, port, username, password,
-				queueName, usesCorrelationId, deserializationSchema);
+		private ConnectionFactory factory;
 
+		public ConstructorTestClass(RMQConnectionConfig rmqConnectionConfig,
+									String queueName,
+									boolean usesCorrelationId,
+									DeserializationSchema<String> deserializationSchema) throws Exception {
+			super(rmqConnectionConfig, queueName, usesCorrelationId, deserializationSchema);
+			RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder();
+			builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+			factory = Mockito.spy(builder.build().getConnectionFactory());
 			try {
 				Mockito.doThrow(new RuntimeException()).when(factory).newConnection();
 			} catch (IOException e) {
@@ -295,7 +296,9 @@ public class RMQSourceTest {
 	private class RMQTestSource extends RMQSource<String> {
 
 		public RMQTestSource() {
-			super("hostDummy", -1, "", "", "queueDummy", true, new StringDeserializationScheme());
+			super(new RMQConnectionConfig.Builder().setHost("hostTest")
+					.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
+				, "queueDummy", true, new StringDeserializationScheme());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
new file mode 100644
index 0000000..40985ce
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class RMQConnectionConfigTest {
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfHostIsNull() throws NoSuchAlgorithmException,
+		KeyManagementException, URISyntaxException {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setPort(1000).setUserName("guest")
+			.setPassword("guest").setVirtualHost("/").build();
+		connectionConfig.getConnectionFactory();
+	}
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException,
+		KeyManagementException, URISyntaxException {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setHost("localhost").setUserName("guest")
+			.setPassword("guest").setVirtualHost("/").build();
+		connectionConfig.getConnectionFactory();
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
+		KeyManagementException, URISyntaxException {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setHost("localhost").setUserName("guest")
+			.setPassword("guest").setVirtualHost("/").build();
+		ConnectionFactory factory = connectionConfig.getConnectionFactory();
+		assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, factory.getConnectionTimeout());
+	}
+
+	@Test
+	public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
+		KeyManagementException, URISyntaxException {
+		RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+			.setHost("localhost").setPort(5000).setUserName("guest")
+			.setPassword("guest").setVirtualHost("/")
+			.setConnectionTimeout(5000).build();
+		ConnectionFactory factory = connectionConfig.getConnectionFactory();
+		assertEquals(5000, factory.getConnectionTimeout());
+	}
+}