You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/01 05:28:13 UTC

[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

    [ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15309297#comment-15309297 ] 

ASF GitHub Bot commented on FLINK-3763:
---------------------------------------

Github user subhankarb commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2054#discussion_r65303646
  
    --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java ---
    @@ -0,0 +1,455 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.rabbitmq.common;
    +
    +import com.google.common.base.Preconditions;
    +import com.rabbitmq.client.ConnectionFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Connection Configuration for RMQ.
    + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)}
    + * will be used for initialize the RMQ connection or
    + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)}
    + * will be used for initialize the RMQ connection
    + */
    +public class RMQConnectionConfig implements Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
    +
    +	private String host;
    +	private int port;
    +	private String virtualHost;
    +	private String username;
    +	private String password;
    +	private String uri;
    +
    +	private int networkRecoveryInterval;
    +	private boolean automaticRecovery;
    +	private boolean topologyRecovery;
    +
    +	private int connectionTimeout;
    +	private int requestedChannelMax;
    +	private int requestedFrameMax;
    +	private int requestedHeartbeat;
    +
    +	/**
    +	 *
    +	 * @param host host name
    +	 * @param port port
    +	 * @param virtualHost virtual host
    +	 * @param username username
    +	 * @param password password
    +
    +	 * @param networkRecoveryInterval connection recovery interval in milliseconds
    +	 * @param automaticRecovery if automatic connection recovery
    +	 * @param topologyRecovery if topology recovery
    +	 * @param connectionTimeout connection timeout
    +	 * @param requestedChannelMax requested maximum channel number
    +	 * @param requestedFrameMax requested maximum frame size
    +	 * @param requestedHeartbeat requested heartbeat interval
    +	 * @throws NullPointerException if host or virtual host or username or password is null
    +     */
    +	private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password,
    +								int networkRecoveryInterval, boolean automaticRecovery,
    +								boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax,
    +								int requestedHeartbeat){
    +		Preconditions.checkNotNull(host, "host can not be null");
    +		Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
    +		Preconditions.checkNotNull(username, "username can not be null");
    +		Preconditions.checkNotNull(password, "password can not be null");
    +		this.host = host;
    +		this.port = port;
    +		this.virtualHost = virtualHost;
    +		this.username = username;
    +		this.password = password;
    +
    +		this.networkRecoveryInterval = networkRecoveryInterval;
    +		this.automaticRecovery = automaticRecovery;
    +		this.topologyRecovery = topologyRecovery;
    +		this.connectionTimeout = connectionTimeout;
    +		this.requestedChannelMax = requestedChannelMax;
    +		this.requestedFrameMax = requestedFrameMax;
    +		this.requestedHeartbeat = requestedHeartbeat;
    +	}
    +
    +	/**
    +	 *
    +	 * @param uri the connection URI
    +	 * @param networkRecoveryInterval connection recovery interval in milliseconds
    +	 * @param automaticRecovery if automatic connection recovery
    +	 * @param topologyRecovery if topology recovery
    +	 * @param connectionTimeout connection timeout
    +	 * @param requestedChannelMax requested maximum channel number
    +     * @param requestedFrameMax requested maximum frame size
    +     * @param requestedHeartbeat requested heartbeat interval
    +	 * @throws NullPointerException if URI is null
    +     */
    +	private RMQConnectionConfig(String uri, int networkRecoveryInterval, boolean automaticRecovery,
    +								boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax,
    +								int requestedHeartbeat){
    +		Preconditions.checkNotNull(uri, "Uri can not be null");
    +		this.uri = uri;
    +
    +		this.networkRecoveryInterval = networkRecoveryInterval;
    +		this.automaticRecovery = automaticRecovery;
    +		this.topologyRecovery = topologyRecovery;
    +		this.connectionTimeout = connectionTimeout;
    +		this.requestedChannelMax = requestedChannelMax;
    +		this.requestedFrameMax = requestedFrameMax;
    +		this.requestedHeartbeat = requestedHeartbeat;
    +	}
    +
    +	/** @return the host to use for connections */
    +	public String getHost() {
    +		return host;
    +	}
    +
    +	/** @return the port to use for connections */
    +	public int getPort() {
    +		return port;
    +	}
    +
    +	/**
    +	 * Retrieve the virtual host.
    +	 * @return the virtual host to use when connecting to the broker
    +	 */
    +	public String getVirtualHost() {
    +		return virtualHost;
    +	}
    +
    +	/**
    +	 * Retrieve the user name.
    +	 * @return the AMQP user name to use when connecting to the broker
    +	 */
    +	public String getUsername() {
    +		return username;
    +	}
    +
    +	/**
    +	 * Retrieve the password.
    +	 * @return the password to use when connecting to the broker
    +	 */
    +	public String getPassword() {
    +		return password;
    +	}
    +
    +	/**
    +	 * Retrieve the URI.
    +	 * @return the connection URI when connecting to the broker
    +     */
    +	public String getUri() {
    +		return uri;
    +	}
    +
    +	/**
    +	 * Returns automatic connection recovery interval in milliseconds.
    +	 * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
    +	 */
    +	public int getNetworkRecoveryInterval() {
    +		return networkRecoveryInterval;
    +	}
    +
    +	/**
    +	 * Returns true if automatic connection recovery is enabled, false otherwise
    +	 * @return true if automatic connection recovery is enabled, false otherwise
    +	 */
    +	public boolean isAutomaticRecovery() {
    +		return automaticRecovery;
    +	}
    +
    +	/**
    +	 * Returns true if topology recovery is enabled, false otherwise
    +	 * @return true if topology recovery is enabled, false otherwise
    +	 */
    +	public boolean isTopologyRecovery() {
    +		return topologyRecovery;
    +	}
    +
    +	/**
    +	 * Retrieve the connection timeout.
    +	 * @return the connection timeout, in milliseconds; zero for infinite
    +	 */
    +	public int getConnectionTimeout() {
    +		return connectionTimeout;
    +	}
    +
    +	/**
    +	 * Retrieve the requested maximum channel number
    +	 * @return the initially requested maximum channel number; zero for unlimited
    +	 */
    +	public int getRequestedChannelMax() {
    +		return requestedChannelMax;
    +	}
    +
    +	/**
    +	 * Retrieve the requested maximum frame size
    +	 * @return the initially requested maximum frame size, in octets; zero for unlimited
    +	 */
    +	public int getRequestedFrameMax() {
    +		return requestedFrameMax;
    +	}
    +
    +	/**
    +	 * Retrieve the requested heartbeat interval.
    +	 * @return the initially requested heartbeat interval, in seconds; zero for none
    +	 */
    +	public int getRequestedHeartbeat() {
    +		return requestedHeartbeat;
    +	}
    +
    +	/**
    +	 *
    +	 * @return Connection Factory for RMQ
    +	 * @throws Exception if Malformed URI has been passed
    +     */
    +	public ConnectionFactory getConnectionFactory() throws Exception {
    +		ConnectionFactory factory = new ConnectionFactory();
    +		if (this.uri != null && !this.uri.isEmpty()){
    +			try {
    +				factory.setUri(getUri());
    +			}catch (Exception e){
    +				LOG.error("Failed to parse uri {}", e.getMessage());
    +				throw e;
    +			}
    +		} else {
    +			factory.setHost(getHost());
    +			factory.setPort(getPort());
    +			factory.setVirtualHost(getVirtualHost());
    +			factory.setUsername(getUsername());
    +			factory.setPassword(getPassword());
    +		}
    +
    +		factory.setAutomaticRecoveryEnabled(isAutomaticRecovery());
    +		factory.setConnectionTimeout(getConnectionTimeout());
    +		factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval());
    +		factory.setRequestedHeartbeat(getRequestedHeartbeat());
    +		factory.setTopologyRecoveryEnabled(isTopologyRecovery());
    +		factory.setRequestedChannelMax(getRequestedChannelMax());
    +		factory.setRequestedFrameMax(getRequestedFrameMax());
    +
    +		return factory;
    +	}
    +
    +	public static class Builder {
    +		/** The default host */
    +		public static final String DEFAULT_HOST = "localhost";
    --- End diff --
    
    I would love to use solution a. but 
    ```java
    private int networkRecoveryInterval  = 5000;
    private boolean automaticRecovery  = false;
    private boolean topologyRecovery    = true;
    ```
    these three are not public in **ConnectionFactory**  so we can have to use the null check for these three. so better i am going with solution b. :)


> RabbitMQ Source/Sink standardize connection parameters
> ------------------------------------------------------
>
>                 Key: FLINK-3763
>                 URL: https://issues.apache.org/jira/browse/FLINK-3763
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.0.1
>            Reporter: Robert Batts
>            Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of establishing a connection, currently the sink is lacking connection parameters that are available on the source. Additionally, VirtualHost should be an offered parameter for multi-tenant RabbitMQ clusters (if not specified it goes to the vhost '/').
> Connection Parameters
> ===================
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)