You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/01 05:28:13 UTC
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize
connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15309297#comment-15309297 ]
ASF GitHub Bot commented on FLINK-3763:
---------------------------------------
Github user subhankarb commented on a diff in the pull request:
https://github.com/apache/flink/pull/2054#discussion_r65303646
--- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+ private String host;
+ private int port;
+ private String virtualHost;
+ private String username;
+ private String password;
+ private String uri;
+
+ private int networkRecoveryInterval;
+ private boolean automaticRecovery;
+ private boolean topologyRecovery;
+
+ private int connectionTimeout;
+ private int requestedChannelMax;
+ private int requestedFrameMax;
+ private int requestedHeartbeat;
+
+ /**
+ *
+ * @param host host name
+ * @param port port
+ * @param virtualHost virtual host
+ * @param username username
+ * @param password password
+
+ * @param networkRecoveryInterval connection recovery interval in milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if host or virtual host or username or password is null
+ */
+ private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password,
+ int networkRecoveryInterval, boolean automaticRecovery,
+ boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax,
+ int requestedHeartbeat){
+ Preconditions.checkNotNull(host, "host can not be null");
+ Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
+ Preconditions.checkNotNull(username, "username can not be null");
+ Preconditions.checkNotNull(password, "password can not be null");
+ this.host = host;
+ this.port = port;
+ this.virtualHost = virtualHost;
+ this.username = username;
+ this.password = password;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @param uri the connection URI
+ * @param networkRecoveryInterval connection recovery interval in milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if URI is null
+ */
+ private RMQConnectionConfig(String uri, int networkRecoveryInterval, boolean automaticRecovery,
+ boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax,
+ int requestedHeartbeat){
+ Preconditions.checkNotNull(uri, "Uri can not be null");
+ this.uri = uri;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /** @return the host to use for connections */
+ public String getHost() {
+ return host;
+ }
+
+ /** @return the port to use for connections */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Retrieve the virtual host.
+ * @return the virtual host to use when connecting to the broker
+ */
+ public String getVirtualHost() {
+ return virtualHost;
+ }
+
+ /**
+ * Retrieve the user name.
+ * @return the AMQP user name to use when connecting to the broker
+ */
+ public String getUsername() {
+ return username;
+ }
+
+ /**
+ * Retrieve the password.
+ * @return the password to use when connecting to the broker
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Retrieve the URI.
+ * @return the connection URI when connecting to the broker
+ */
+ public String getUri() {
+ return uri;
+ }
+
+ /**
+ * Returns automatic connection recovery interval in milliseconds.
+ * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
+ */
+ public int getNetworkRecoveryInterval() {
+ return networkRecoveryInterval;
+ }
+
+ /**
+ * Returns true if automatic connection recovery is enabled, false otherwise
+ * @return true if automatic connection recovery is enabled, false otherwise
+ */
+ public boolean isAutomaticRecovery() {
+ return automaticRecovery;
+ }
+
+ /**
+ * Returns true if topology recovery is enabled, false otherwise
+ * @return true if topology recovery is enabled, false otherwise
+ */
+ public boolean isTopologyRecovery() {
+ return topologyRecovery;
+ }
+
+ /**
+ * Retrieve the connection timeout.
+ * @return the connection timeout, in milliseconds; zero for infinite
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Retrieve the requested maximum channel number
+ * @return the initially requested maximum channel number; zero for unlimited
+ */
+ public int getRequestedChannelMax() {
+ return requestedChannelMax;
+ }
+
+ /**
+ * Retrieve the requested maximum frame size
+ * @return the initially requested maximum frame size, in octets; zero for unlimited
+ */
+ public int getRequestedFrameMax() {
+ return requestedFrameMax;
+ }
+
+ /**
+ * Retrieve the requested heartbeat interval.
+ * @return the initially requested heartbeat interval, in seconds; zero for none
+ */
+ public int getRequestedHeartbeat() {
+ return requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @return Connection Factory for RMQ
+ * @throws Exception if Malformed URI has been passed
+ */
+ public ConnectionFactory getConnectionFactory() throws Exception {
+ ConnectionFactory factory = new ConnectionFactory();
+ if (this.uri != null && !this.uri.isEmpty()){
+ try {
+ factory.setUri(getUri());
+ }catch (Exception e){
+ LOG.error("Failed to parse uri {}", e.getMessage());
+ throw e;
+ }
+ } else {
+ factory.setHost(getHost());
+ factory.setPort(getPort());
+ factory.setVirtualHost(getVirtualHost());
+ factory.setUsername(getUsername());
+ factory.setPassword(getPassword());
+ }
+
+ factory.setAutomaticRecoveryEnabled(isAutomaticRecovery());
+ factory.setConnectionTimeout(getConnectionTimeout());
+ factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval());
+ factory.setRequestedHeartbeat(getRequestedHeartbeat());
+ factory.setTopologyRecoveryEnabled(isTopologyRecovery());
+ factory.setRequestedChannelMax(getRequestedChannelMax());
+ factory.setRequestedFrameMax(getRequestedFrameMax());
+
+ return factory;
+ }
+
+ public static class Builder {
+ /** The default host */
+ public static final String DEFAULT_HOST = "localhost";
--- End diff --
I would love to use solution a. but
```java
private int networkRecoveryInterval = 5000;
private boolean automaticRecovery = false;
private boolean topologyRecovery = true;
```
these three are not public in **ConnectionFactory** so we can have to use the null check for these three. so better i am going with solution b. :)
> RabbitMQ Source/Sink standardize connection parameters
> ------------------------------------------------------
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Affects Versions: 1.0.1
> Reporter: Robert Batts
> Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of establishing a connection, currently the sink is lacking connection parameters that are available on the source. Additionally, VirtualHost should be an offered parameter for multi-tenant RabbitMQ clusters (if not specified it goes to the vhost '/').
> Connection Parameters
> ===================
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because that would offer all 5 of the above parameters in a single String.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)