You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:14 UTC
[24/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
new file mode 100644
index 0000000..72bac1c
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer,
+ * Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean,
+ * Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+ private String host;
+ private Integer port;
+ private String virtualHost;
+ private String username;
+ private String password;
+ private String uri;
+
+ private Integer networkRecoveryInterval;
+ private Boolean automaticRecovery;
+ private Boolean topologyRecovery;
+
+ private Integer connectionTimeout;
+ private Integer requestedChannelMax;
+ private Integer requestedFrameMax;
+ private Integer requestedHeartbeat;
+
+ /**
+ *
+ * @param host host name
+ * @param port port
+ * @param virtualHost virtual host
+ * @param username username
+ * @param password password
+ * @param networkRecoveryInterval connection recovery interval in milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if host or virtual host or username or password is null
+ */
+ private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password,
+ Integer networkRecoveryInterval, Boolean automaticRecovery,
+ Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+ Integer requestedFrameMax, Integer requestedHeartbeat){
+ Preconditions.checkNotNull(host, "host can not be null");
+ Preconditions.checkNotNull(port, "port can not be null");
+ Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
+ Preconditions.checkNotNull(username, "username can not be null");
+ Preconditions.checkNotNull(password, "password can not be null");
+ this.host = host;
+ this.port = port;
+ this.virtualHost = virtualHost;
+ this.username = username;
+ this.password = password;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @param uri the connection URI
+ * @param networkRecoveryInterval connection recovery interval in milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if URI is null
+ */
+ private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery,
+ Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+ Integer requestedFrameMax, Integer requestedHeartbeat){
+ Preconditions.checkNotNull(uri, "Uri can not be null");
+ this.uri = uri;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /** @return the host to use for connections */
+ public String getHost() {
+ return host;
+ }
+
+ /** @return the port to use for connections */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Retrieve the virtual host.
+ * @return the virtual host to use when connecting to the broker
+ */
+ public String getVirtualHost() {
+ return virtualHost;
+ }
+
+ /**
+ * Retrieve the user name.
+ * @return the AMQP user name to use when connecting to the broker
+ */
+ public String getUsername() {
+ return username;
+ }
+
+ /**
+ * Retrieve the password.
+ * @return the password to use when connecting to the broker
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Retrieve the URI.
+ * @return the connection URI when connecting to the broker
+ */
+ public String getUri() {
+ return uri;
+ }
+
+ /**
+ * Returns automatic connection recovery interval in milliseconds.
+ * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
+ */
+ public Integer getNetworkRecoveryInterval() {
+ return networkRecoveryInterval;
+ }
+
+ /**
+ * Returns true if automatic connection recovery is enabled, false otherwise
+ * @return true if automatic connection recovery is enabled, false otherwise
+ */
+ public Boolean isAutomaticRecovery() {
+ return automaticRecovery;
+ }
+
+ /**
+ * Returns true if topology recovery is enabled, false otherwise
+ * @return true if topology recovery is enabled, false otherwise
+ */
+ public Boolean isTopologyRecovery() {
+ return topologyRecovery;
+ }
+
+ /**
+ * Retrieve the connection timeout.
+ * @return the connection timeout, in milliseconds; zero for infinite
+ */
+ public Integer getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Retrieve the requested maximum channel number
+ * @return the initially requested maximum channel number; zero for unlimited
+ */
+ public Integer getRequestedChannelMax() {
+ return requestedChannelMax;
+ }
+
+ /**
+ * Retrieve the requested maximum frame size
+ * @return the initially requested maximum frame size, in octets; zero for unlimited
+ */
+ public Integer getRequestedFrameMax() {
+ return requestedFrameMax;
+ }
+
+ /**
+ * Retrieve the requested heartbeat interval.
+ * @return the initially requested heartbeat interval, in seconds; zero for none
+ */
+ public Integer getRequestedHeartbeat() {
+ return requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @return Connection Factory for RMQ
+ * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed
+ */
+ public ConnectionFactory getConnectionFactory() throws URISyntaxException,
+ NoSuchAlgorithmException, KeyManagementException {
+ ConnectionFactory factory = new ConnectionFactory();
+ if (this.uri != null && !this.uri.isEmpty()){
+ try {
+ factory.setUri(this.uri);
+ } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
+ LOG.error("Failed to parse uri", e);
+ throw e;
+ }
+ } else {
+ factory.setHost(this.host);
+ factory.setPort(this.port);
+ factory.setVirtualHost(this.virtualHost);
+ factory.setUsername(this.username);
+ factory.setPassword(this.password);
+ }
+
+ if (this.automaticRecovery != null) {
+ factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
+ }
+ if (this.connectionTimeout != null) {
+ factory.setConnectionTimeout(this.connectionTimeout);
+ }
+ if (this.networkRecoveryInterval != null) {
+ factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
+ }
+ if (this.requestedHeartbeat != null) {
+ factory.setRequestedHeartbeat(this.requestedHeartbeat);
+ }
+ if (this.topologyRecovery != null) {
+ factory.setTopologyRecoveryEnabled(this.topologyRecovery);
+ }
+ if (this.requestedChannelMax != null) {
+ factory.setRequestedChannelMax(this.requestedChannelMax);
+ }
+ if (this.requestedFrameMax != null) {
+ factory.setRequestedFrameMax(this.requestedFrameMax);
+ }
+
+ return factory;
+ }
+
+ /**
+ * The Builder Class for {@link RMQConnectionConfig}
+ */
+ public static class Builder {
+
+ private String host;
+ private Integer port;
+ private String virtualHost;
+ private String username;
+ private String password;
+
+ private Integer networkRecoveryInterval;
+ private Boolean automaticRecovery;
+ private Boolean topologyRecovery;
+
+ private Integer connectionTimeout;
+ private Integer requestedChannelMax;
+ private Integer requestedFrameMax;
+ private Integer requestedHeartbeat;
+
+ private String uri;
+
+ /**
+ * Set the target port.
+ * @param port the default port to use for connections
+ * @return the Builder
+ */
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /** @param host the default host to use for connections
+ * @return the Builder
+ */
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ /**
+ * Set the virtual host.
+ * @param virtualHost the virtual host to use when connecting to the broker
+ * @return the Builder
+ */
+ public Builder setVirtualHost(String virtualHost) {
+ this.virtualHost = virtualHost;
+ return this;
+ }
+
+ /**
+ * Set the user name.
+ * @param username the AMQP user name to use when connecting to the broker
+ * @return the Builder
+ */
+ public Builder setUserName(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /**
+ * Set the password.
+ * @param password the password to use when connecting to the broker
+ * @return the Builder
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Convenience method for setting the fields in an AMQP URI: host,
+ * port, username, password and virtual host. If any part of the
+ * URI is ommited, the ConnectionFactory's corresponding variable
+ * is left unchanged.
+ * @param uri is the AMQP URI containing the data
+ * @return the Builder
+ */
+ public Builder setUri(String uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ /**
+ * Enables or disables topology recovery
+ * @param topologyRecovery if true, enables topology recovery
+ * @return the Builder
+ */
+ public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) {
+ this.topologyRecovery = topologyRecovery;
+ return this;
+ }
+
+ /**
+ * Set the requested heartbeat.
+ * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
+ * @return the Builder
+ */
+ public Builder setRequestedHeartbeat(int requestedHeartbeat) {
+ this.requestedHeartbeat = requestedHeartbeat;
+ return this;
+ }
+
+ /**
+ * Set the requested maximum frame size
+ * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
+ * @return the Builder
+ */
+ public Builder setRequestedFrameMax(int requestedFrameMax) {
+ this.requestedFrameMax = requestedFrameMax;
+ return this;
+ }
+
+ /**
+ * Set the requested maximum channel number
+ * @param requestedChannelMax initially requested maximum channel number; zero for unlimited
+ */
+ public Builder setRequestedChannelMax(int requestedChannelMax) {
+ this.requestedChannelMax = requestedChannelMax;
+ return this;
+ }
+
+ /**
+ * Sets connection recovery interval. Default is 5000.
+ * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
+ * @return the Builder
+ */
+ public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) {
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ return this;
+ }
+
+ /**
+ * Set the connection timeout.
+ * @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite
+ * @return the Builder
+ */
+ public Builder setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ /**
+ * Enables or disables automatic connection recovery
+ * @param automaticRecovery if true, enables connection recovery
+ * @return the Builder
+ */
+ public Builder setAutomaticRecovery(boolean automaticRecovery) {
+ this.automaticRecovery = automaticRecovery;
+ return this;
+ }
+
+ /**
+ * The Builder method
+ * If URI is NULL we use host, port, vHost, username, password combination
+ * to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
+ * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ *
+ * else URI will be used to initialize the client connection
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * @return RMQConnectionConfig
+ */
+ public RMQConnectionConfig build(){
+ if(this.uri != null) {
+ return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
+ this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
+ this.requestedFrameMax, this.requestedHeartbeat);
+ } else {
+ return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
+ this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
+ this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
new file mode 100644
index 0000000..b63c835
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.modules.junit4.PowerMockRunner;
+import com.rabbitmq.client.Connection;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Tests for the RMQSource. The source supports two operation modes.
+ * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and the deduplication mechanism in
+ * {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.
+ * 2) At-least-once (when checkpointed) with RabbitMQ transactions but not deduplication.
+ * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+ *
+ * This tests assumes that the message ids are increasing monotonously. That doesn't have to be the
+ * case. The correlation id is used to uniquely identify messages.
+ */
+@RunWith(PowerMockRunner.class)
+public class RMQSourceTest {
+
+ private RMQSource<String> source;
+
+ private Configuration config = new Configuration();
+
+ private Thread sourceThread;
+
+ private volatile long messageId;
+
+ private boolean generateCorrelationIds;
+
+ private volatile Exception exception;
+
+ @Before
+ public void beforeTest() throws Exception {
+
+ source = new RMQTestSource();
+ source.open(config);
+
+ messageId = 0;
+ generateCorrelationIds = true;
+
+ sourceThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ source.run(new DummySourceContext());
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+ });
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ source.cancel();
+ sourceThread.join();
+ }
+
+ @Test
+ public void throwExceptionIfConnectionFactoryReturnNull() throws Exception {
+ RMQConnectionConfig connectionConfig = Mockito.mock(RMQConnectionConfig.class);
+ ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
+ Connection connection = Mockito.mock(Connection.class);
+ Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+ Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+ Mockito.when(connection.createChannel()).thenReturn(null);
+
+ RMQSource<String> rmqSource = new RMQSource<>(
+ connectionConfig, "queueDummy", true, new StringDeserializationScheme());
+ try {
+ rmqSource.open(new Configuration());
+ } catch (RuntimeException ex) {
+ assertEquals("None of RabbitMQ channels are available", ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCheckpointing() throws Exception {
+ source.autoAck = false;
+ sourceThread.start();
+
+ Thread.sleep(5);
+
+ final Random random = new Random(System.currentTimeMillis());
+ int numSnapshots = 50;
+ long previousSnapshotId;
+ long lastSnapshotId = 0;
+
+ long totalNumberOfAcks = 0;
+
+ for (int i=0; i < numSnapshots; i++) {
+ long snapshotId = random.nextLong();
+ SerializedCheckpointData[] data;
+
+ synchronized (DummySourceContext.lock) {
+ data = source.snapshotState(snapshotId, System.currentTimeMillis());
+ previousSnapshotId = lastSnapshotId;
+ lastSnapshotId = messageId;
+ }
+ // let some time pass
+ Thread.sleep(5);
+
+ // check if the correct number of messages have been snapshotted
+ final long numIds = lastSnapshotId - previousSnapshotId;
+ assertEquals(numIds, data[0].getNumIds());
+ // deserialize and check if the last id equals the last snapshotted id
+ ArrayDeque<Tuple2<Long, List<String>>> deque = SerializedCheckpointData.toDeque(data, new StringSerializer());
+ List<String> messageIds = deque.getLast().f1;
+ if (messageIds.size() > 0) {
+ assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1)));
+ }
+
+ // check if the messages are being acknowledged and the transaction comitted
+ synchronized (DummySourceContext.lock) {
+ source.notifyCheckpointComplete(snapshotId);
+ }
+ totalNumberOfAcks += numIds;
+
+ }
+
+ Mockito.verify(source.channel, Mockito.times((int) totalNumberOfAcks)).basicAck(Mockito.anyLong(), Mockito.eq(false));
+ Mockito.verify(source.channel, Mockito.times(numSnapshots)).txCommit();
+
+ }
+
+ /**
+ * Checks whether recurring ids are processed again (they shouldn't be).
+ */
+ @Test
+ public void testDuplicateId() throws Exception {
+ source.autoAck = false;
+ sourceThread.start();
+
+ while (messageId < 10) {
+ // wait until messages have been processed
+ Thread.sleep(5);
+ }
+
+ long oldMessageId;
+ synchronized (DummySourceContext.lock) {
+ oldMessageId = messageId;
+ messageId = 0;
+ }
+
+ while (messageId < 10) {
+ // process again
+ Thread.sleep(5);
+ }
+
+ synchronized (DummySourceContext.lock) {
+ assertEquals(Math.max(messageId, oldMessageId), DummySourceContext.numElementsCollected);
+ }
+ }
+
+
+ /**
+ * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids
+ */
+ @Test
+ public void testCheckpointingDisabled() throws Exception {
+ source.autoAck = true;
+ sourceThread.start();
+
+ while (DummySourceContext.numElementsCollected < 50) {
+ // wait until messages have been processed
+ Thread.sleep(5);
+ }
+
+ // see addId in RMQTestSource.addId for the assert
+ }
+
+ /**
+ * Tests error reporting in case of invalid correlation ids
+ */
+ @Test
+ public void testCorrelationIdNotSet() throws InterruptedException {
+ generateCorrelationIds = false;
+ source.autoAck = false;
+ sourceThread.start();
+
+ sourceThread.join();
+
+ assertNotNull(exception);
+ assertTrue(exception instanceof NullPointerException);
+ }
+
+ /**
+ * Tests whether constructor params are passed correctly.
+ */
+ @Test
+ public void testConstructorParams() throws Exception {
+ // verify construction params
+ RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder();
+ builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+ ConstructorTestClass testObj = new ConstructorTestClass(
+ builder.build(), "queueTest", false, new StringDeserializationScheme());
+
+ try {
+ testObj.open(new Configuration());
+ } catch (Exception e) {
+ // connection fails but check if args have been passed correctly
+ }
+
+ assertEquals("hostTest", testObj.getFactory().getHost());
+ assertEquals(999, testObj.getFactory().getPort());
+ assertEquals("userTest", testObj.getFactory().getUsername());
+ assertEquals("passTest", testObj.getFactory().getPassword());
+ }
+
+ private static class ConstructorTestClass extends RMQSource<String> {
+
+ private ConnectionFactory factory;
+
+ public ConstructorTestClass(RMQConnectionConfig rmqConnectionConfig,
+ String queueName,
+ boolean usesCorrelationId,
+ DeserializationSchema<String> deserializationSchema) throws Exception {
+ super(rmqConnectionConfig, queueName, usesCorrelationId, deserializationSchema);
+ RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder();
+ builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+ factory = Mockito.spy(builder.build().getConnectionFactory());
+ try {
+ Mockito.doThrow(new RuntimeException()).when(factory).newConnection();
+ } catch (IOException e) {
+ fail("Failed to stub connection method");
+ }
+ }
+
+ @Override
+ protected ConnectionFactory setupConnectionFactory() {
+ return factory;
+ }
+
+ public ConnectionFactory getFactory() {
+ return factory;
+ }
+ }
+
+ private static class StringDeserializationScheme implements DeserializationSchema<String> {
+
+ @Override
+ public String deserialize(byte[] message) throws IOException {
+ try {
+ // wait a bit to not cause too much cpu load
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return new String(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeExtractor.getForClass(String.class);
+ }
+ }
+
+ private class RMQTestSource extends RMQSource<String> {
+
+ public RMQTestSource() {
+ super(new RMQConnectionConfig.Builder().setHost("hostTest")
+ .setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
+ , "queueDummy", true, new StringDeserializationScheme());
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+
+ consumer = Mockito.mock(QueueingConsumer.class);
+
+ // Mock for delivery
+ final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
+ Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+
+ try {
+ Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);
+ } catch (InterruptedException e) {
+ fail("Couldn't setup up deliveryMock");
+ }
+
+ // Mock for envelope
+ Envelope envelope = Mockito.mock(Envelope.class);
+ Mockito.when(deliveryMock.getEnvelope()).thenReturn(envelope);
+
+ Mockito.when(envelope.getDeliveryTag()).thenAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ return ++messageId;
+ }
+ });
+
+ // Mock for properties
+ AMQP.BasicProperties props = Mockito.mock(AMQP.BasicProperties.class);
+ Mockito.when(deliveryMock.getProperties()).thenReturn(props);
+
+ Mockito.when(props.getCorrelationId()).thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation) throws Throwable {
+ return generateCorrelationIds ? "" + messageId : null;
+ }
+ });
+
+ }
+
+ @Override
+ protected ConnectionFactory setupConnectionFactory() {
+ ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
+ Connection connection = Mockito.mock(Connection.class);
+ try {
+ Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+ Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
+ } catch (IOException e) {
+ fail("Test environment couldn't be created.");
+ }
+ return connectionFactory;
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return Mockito.mock(StreamingRuntimeContext.class);
+ }
+
+ @Override
+ protected boolean addId(String uid) {
+ assertEquals(false, autoAck);
+ return super.addId(uid);
+ }
+ }
+
+ private static class DummySourceContext implements SourceFunction.SourceContext<String> {
+
+ private static final Object lock = new Object();
+
+ private static long numElementsCollected;
+
+ public DummySourceContext() {
+ numElementsCollected = 0;
+ }
+
+ @Override
+ public void collect(String element) {
+ numElementsCollected++;
+ }
+
+ @Override
+ public void collectWithTimestamp(java.lang.String element, long timestamp) {
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
new file mode 100644
index 0000000..40985ce
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class RMQConnectionConfigTest {
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfHostIsNull() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setPort(1000).setUserName("guest")
+ .setPassword("guest").setVirtualHost("/").build();
+ connectionConfig.getConnectionFactory();
+ }
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost").setUserName("guest")
+ .setPassword("guest").setVirtualHost("/").build();
+ connectionConfig.getConnectionFactory();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost").setUserName("guest")
+ .setPassword("guest").setVirtualHost("/").build();
+ ConnectionFactory factory = connectionConfig.getConnectionFactory();
+ assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, factory.getConnectionTimeout());
+ }
+
+ @Test
+ public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException,
+ KeyManagementException, URISyntaxException {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost").setPort(5000).setUserName("guest")
+ .setPassword("guest").setVirtualHost("/")
+ .setConnectionTimeout(5000).build();
+ ConnectionFactory factory = connectionConfig.getConnectionFactory();
+ assertEquals(5000, factory.getConnectionTimeout());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
new file mode 100644
index 0000000..199cd1e
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class RMQSinkTest {
+
+ private static final String QUEUE_NAME = "queue";
+ private static final String MESSAGE_STR = "msg";
+ private static final byte[] MESSAGE = new byte[1];
+
+ private RMQConnectionConfig rmqConnectionConfig;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+ private SerializationSchema<String> serializationSchema;
+
+
+ @Before
+ public void before() throws Exception {
+ serializationSchema = spy(new DummySerializationSchema());
+ rmqConnectionConfig = mock(RMQConnectionConfig.class);
+ connectionFactory = mock(ConnectionFactory.class);
+ connection = mock(Connection.class);
+ channel = mock(Channel.class);
+
+ when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+ when(connectionFactory.newConnection()).thenReturn(connection);
+ when(connection.createChannel()).thenReturn(channel);
+ }
+
+ @Test
+ public void openCallDeclaresQueue() throws Exception {
+ createRMQSink();
+
+ verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
+ }
+
+ @Test
+ public void throwExceptionIfChannelIsNull() throws Exception {
+ when(connection.createChannel()).thenReturn(null);
+ try {
+ createRMQSink();
+ } catch (RuntimeException ex) {
+ assertEquals("None of RabbitMQ channels are available", ex.getMessage());
+ }
+ }
+
+ private RMQSink<String> createRMQSink() throws Exception {
+ RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+ rmqSink.open(new Configuration());
+ return rmqSink;
+ }
+
+ @Test
+ public void invokePublishBytesToQueue() throws Exception {
+ RMQSink<String> rmqSink = createRMQSink();
+
+ rmqSink.invoke(MESSAGE_STR);
+ verify(serializationSchema).serialize(MESSAGE_STR);
+ verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void exceptionDuringPublishingIsNotIgnored() throws Exception {
+ RMQSink<String> rmqSink = createRMQSink();
+
+ doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+ rmqSink.invoke("msg");
+ }
+
+ @Test
+ public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+ RMQSink<String> rmqSink = createRMQSink();
+ rmqSink.setLogFailuresOnly(true);
+
+ doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+ rmqSink.invoke("msg");
+ }
+
+ @Test
+ public void closeAllResources() throws Exception {
+ RMQSink<String> rmqSink = createRMQSink();
+
+ rmqSink.close();
+
+ verify(channel).close();
+ verify(connection).close();
+ }
+
+ private class DummySerializationSchema implements SerializationSchema<String> {
+ @Override
+ public byte[] serialize(String element) {
+ return MESSAGE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/pom.xml b/flink-connectors/flink-connector-redis/pom.xml
new file mode 100644
index 0000000..a348f31
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-redis_2.10</artifactId>
+ <name>flink-connector-redis</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <jedis.version>2.8.0</jedis.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>${jedis.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.kstyrc</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
new file mode 100644
index 0000000..f6b0fd7
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}.
+ * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel.
+ * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to
+ * a Redis Cluster.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ *{@code
+ *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisExampleMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2<String, String> data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2<String, String> data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
+ *}</pre>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class RedisSink<IN> extends RichSinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
+
+ /**
+ * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+ * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+ * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+ * {@code additionalKey} used as hash name for {@link RedisDataType#HASH}
+ * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+ * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
+ */
+ private String additionalKey;
+ private RedisMapper<IN> redisSinkMapper;
+ private RedisCommand redisCommand;
+
+ private FlinkJedisConfigBase flinkJedisConfigBase;
+ private RedisCommandsContainer redisCommandsContainer;
+
+ /**
+ * Creates a new {@link RedisSink} that connects to the Redis server.
+ *
+ * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase}
+ * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements.
+ */
+ public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
+ Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
+ Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
+ Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
+
+ this.flinkJedisConfigBase = flinkJedisConfigBase;
+
+ this.redisSinkMapper = redisSinkMapper;
+ RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
+ this.redisCommand = redisCommandDescription.getCommand();
+ this.additionalKey = redisCommandDescription.getAdditionalKey();
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Redis channel.
+ * Depending on the specified Redis data type (see {@link RedisDataType}),
+ * a different Redis command will be applied.
+ * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
+ *
+ * @param input The incoming data
+ */
+ @Override
+ public void invoke(IN input) throws Exception {
+ String key = redisSinkMapper.getKeyFromData(input);
+ String value = redisSinkMapper.getValueFromData(input);
+
+ switch (redisCommand) {
+ case RPUSH:
+ this.redisCommandsContainer.rpush(key, value);
+ break;
+ case LPUSH:
+ this.redisCommandsContainer.lpush(key, value);
+ break;
+ case SADD:
+ this.redisCommandsContainer.sadd(key, value);
+ break;
+ case SET:
+ this.redisCommandsContainer.set(key, value);
+ break;
+ case PFADD:
+ this.redisCommandsContainer.pfadd(key, value);
+ break;
+ case PUBLISH:
+ this.redisCommandsContainer.publish(key, value);
+ break;
+ case ZADD:
+ this.redisCommandsContainer.zadd(this.additionalKey, value, key);
+ break;
+ case HSET:
+ this.redisCommandsContainer.hset(this.additionalKey, key, value);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
+ }
+ }
+
+ /**
+ * Initializes the connection to Redis by either cluster or sentinels or single server.
+ *
+ * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ try {
+ this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+ this.redisCommandsContainer.open();
+ } catch (Exception e) {
+ LOG.error("Redis has not been properly initialized: ", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Closes commands container.
+ * @throws IOException if command container is unable to close.
+ */
+ @Override
+ public void close() throws IOException {
+ if (redisCommandsContainer != null) {
+ redisCommandsContainer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
new file mode 100644
index 0000000..6e6cfe5
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+ private static final long serialVersionUID = 1L;
+
+ private final Set<InetSocketAddress> nodes;
+ private final int maxRedirections;
+
+
+ /**
+ * Jedis cluster configuration.
+ * The list of node is mandatory, and when nodes is not set, it throws NullPointerException.
+ *
+ * @param nodes list of node information for JedisCluster
+ * @param connectionTimeout socket / connection timeout. The default is 2000
+ * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK
+ * @param maxTotal the maximum number of objects that can be allocated by the pool
+ * @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in the pool
+ * @throws NullPointerException if parameter {@code nodes} is {@code null}
+ */
+ private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
+ int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+ Preconditions.checkNotNull(nodes, "Node information should be presented");
+ Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
+ this.nodes = new HashSet<>(nodes);
+ this.maxRedirections = maxRedirections;
+ }
+
+
+
+ /**
+ * Returns nodes.
+ *
+ * @return list of node information
+ */
+ public Set<HostAndPort> getNodes() {
+ Set<HostAndPort> ret = new HashSet<>();
+ for (InetSocketAddress node : nodes) {
+ ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+ }
+ return ret;
+ }
+
+ /**
+ * Returns limit of redirection.
+ *
+ * @return limit of redirection
+ */
+ public int getMaxRedirections() {
+ return maxRedirections;
+ }
+
+
+ /**
+ * Builder for initializing {@link FlinkJedisClusterConfig}.
+ */
+ public static class Builder {
+ private Set<InetSocketAddress> nodes;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int maxRedirections = 5;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+ /**
+ * Sets list of node.
+ *
+ * @param nodes list of node
+ * @return Builder itself
+ */
+ public Builder setNodes(Set<InetSocketAddress> nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ /**
+ * Sets socket / connection timeout.
+ *
+ * @param timeout socket / connection timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets limit of redirection.
+ *
+ * @param maxRedirections limit of redirection, default value is 5
+ * @return Builder itself
+ */
+ public Builder setMaxRedirections(int maxRedirections) {
+ this.maxRedirections = maxRedirections;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisClusterConfig.
+ *
+ * @return JedisClusterConfig
+ */
+ public FlinkJedisClusterConfig build() {
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JedisClusterConfig{" +
+ "nodes=" + nodes +
+ ", timeout=" + connectionTimeout +
+ ", maxRedirections=" + maxRedirections +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
new file mode 100644
index 0000000..a2489b8
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected final int maxTotal;
+ protected final int maxIdle;
+ protected final int minIdle;
+ protected final int connectionTimeout;
+
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
+ Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
+ Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
+ Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
+ Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative");
+ this.connectionTimeout = connectionTimeout;
+ this.maxTotal = maxTotal;
+ this.maxIdle = maxIdle;
+ this.minIdle = minIdle;
+ }
+
+ /**
+ * Returns timeout.
+ *
+ * @return connection timeout
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Get the value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code maxTotal} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getMaxTotal()
+ */
+ public int getMaxTotal() {
+ return maxTotal;
+ }
+
+ /**
+ * Get the value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code maxIdle} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getMaxIdle()
+ */
+ public int getMaxIdle() {
+ return maxIdle;
+ }
+
+ /**
+ * Get the value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code minIdle} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getMinIdle()
+ */
+ public int getMinIdle() {
+ return minIdle;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
new file mode 100644
index 0000000..d261a35
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String host;
+ private final int port;
+ private final int database;
+ private final String password;
+
+
+ /**
+ * Jedis pool configuration.
+ * The host is mandatory, and when host is not set, it throws NullPointerException.
+ *
+ * @param host hostname or IP
+ * @param port port, default value is 6379
+ * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
+ * @param password password, if any
+ * @param database database index
+ * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @throws NullPointerException if parameter {@code host} is {@code null}
+ */
+ private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
+ int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ Preconditions.checkNotNull(host, "Host information should be presented");
+ this.host = host;
+ this.port = port;
+ this.database = database;
+ this.password = password;
+ }
+
+ /**
+ * Returns host.
+ *
+ * @return hostname or IP
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Returns port.
+ *
+ * @return port
+ */
+ public int getPort() {
+ return port;
+ }
+
+
+ /**
+ * Returns database index.
+ *
+ * @return database index
+ */
+ public int getDatabase() {
+ return database;
+ }
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Builder for initializing {@link FlinkJedisPoolConfig}.
+ */
+ public static class Builder {
+ private String host;
+ private int port = Protocol.DEFAULT_PORT;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private String password;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Sets host.
+ *
+ * @param host host
+ * @return Builder itself
+ */
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ /**
+ * Sets port.
+ *
+ * @param port port, default value is 6379
+ * @return Builder itself
+ */
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Sets timeout.
+ *
+ * @param timeout timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets database index.
+ *
+ * @param database database index, default value is 0
+ * @return Builder itself
+ */
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password password, if any
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+
+ /**
+ * Builds JedisPoolConfig.
+ *
+ * @return JedisPoolConfig
+ */
+ public FlinkJedisPoolConfig build() {
+ return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JedisPoolConfig{" +
+ "host='" + host + '\'' +
+ ", port=" + port +
+ ", timeout=" + connectionTimeout +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
new file mode 100644
index 0000000..2cdb397
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+ private final String masterName;
+ private final Set<String> sentinels;
+ private final int soTimeout;
+ private final String password;
+ private final int database;
+
+ /**
+ * Jedis Sentinels config.
+ * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException.
+ *
+ * @param masterName master name of the replica set
+ * @param sentinels set of sentinel hosts
+ * @param connectionTimeout timeout connection timeout
+ * @param soTimeout timeout socket timeout
+ * @param password password, if any
+ * @param database database database index
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
+ * @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in the pool
+ *
+ * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
+ * @throws IllegalArgumentException if {@code sentinels} are empty
+ */
+ private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
+ int connectionTimeout, int soTimeout,
+ String password, int database,
+ int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ Preconditions.checkNotNull(masterName, "Master name should be presented");
+ Preconditions.checkNotNull(sentinels, "Sentinels information should be presented");
+ Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
+
+ this.masterName = masterName;
+ this.sentinels = new HashSet<>(sentinels);
+ this.soTimeout = soTimeout;
+ this.password = password;
+ this.database = database;
+ }
+
+ /**
+ * Returns master name of the replica set.
+ *
+ * @return master name of the replica set.
+ */
+ public String getMasterName() {
+ return masterName;
+ }
+
+ /**
+ * Returns Sentinels host addresses.
+ *
+ * @return Set of Sentinels host addresses
+ */
+ public Set<String> getSentinels() {
+ return sentinels;
+ }
+
+ /**
+ * Returns socket timeout.
+ *
+ * @return socket timeout
+ */
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Returns database index.
+ *
+ * @return database index
+ */
+ public int getDatabase() {
+ return database;
+ }
+
+ /**
+ * Builder for initializing {@link FlinkJedisSentinelConfig}.
+ */
+ public static class Builder {
+ private String masterName;
+ private Set<String> sentinels;
+ private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
+ private int soTimeout = Protocol.DEFAULT_TIMEOUT;
+ private String password;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+ /**
+ * Sets master name of the replica set.
+ *
+ * @param masterName master name of the replica set
+ * @return Builder itself
+ */
+ public Builder setMasterName(String masterName) {
+ this.masterName = masterName;
+ return this;
+ }
+
+ /**
+ * Sets sentinels address.
+ *
+ * @param sentinels host set of the sentinels
+ * @return Builder itself
+ */
+ public Builder setSentinels(Set<String> sentinels) {
+ this.sentinels = sentinels;
+ return this;
+ }
+
+ /**
+ * Sets connection timeout.
+ *
+ * @param connectionTimeout connection timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ /**
+ * Sets socket timeout.
+ *
+ * @param soTimeout socket timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password password, if any
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Sets database index.
+ *
+ * @param database database index, default value is 0
+ * @return Builder itself
+ */
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisSentinelConfig.
+ *
+ * @return JedisSentinelConfig
+ */
+ public FlinkJedisSentinelConfig build(){
+ return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
+ password, database, maxTotal, maxIdle, minIdle);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JedisSentinelConfig{" +
+ "masterName='" + masterName + '\'' +
+ ", connectionTimeout=" + connectionTimeout +
+ ", soTimeout=" + soTimeout +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
new file mode 100644
index 0000000..d6621d6
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, Closeable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
+
+ private transient JedisCluster jedisCluster;
+
+ /**
+ * Initialize Redis command container for Redis cluster.
+ *
+ * @param jedisCluster JedisCluster instance
+ */
+ public RedisClusterContainer(JedisCluster jedisCluster) {
+ Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null");
+
+ this.jedisCluster = jedisCluster;
+ }
+
+ @Override
+ public void open() throws Exception {
+
+ // echo() tries to open a connection and echos back the
+ // message passed as argument. Here we use it to monitor
+ // if we can communicate with the cluster.
+
+ jedisCluster.echo("Test");
+ }
+
+ @Override
+ public void hset(final String key, final String hashField, final String value) {
+ try {
+ jedisCluster.hset(key, hashField, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HSET to hash {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void rpush(final String listName, final String value) {
+ try {
+ jedisCluster.rpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void lpush(String listName, String value) {
+ try {
+ jedisCluster.lpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void sadd(final String setName, final String value) {
+ try {
+ jedisCluster.sadd(setName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+ setName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void publish(final String channelName, final String message) {
+ try {
+ jedisCluster.publish(channelName, message);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+ channelName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void set(final String key, final String value) {
+ try {
+ jedisCluster.set(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void pfadd(final String key, final String element) {
+ try {
+ jedisCluster.set(key, element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void zadd(final String key, final String score, final String element) {
+ try {
+ jedisCluster.zadd(key, Double.valueOf(score), element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Closes the {@link JedisCluster}.
+ */
+ @Override
+ public void close() throws IOException {
+ this.jedisCluster.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
new file mode 100644
index 0000000..55dbfc2
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+ /**
+ * Open the Jedis container.
+ *
+ * @throws Exception if the instance can not be opened properly
+ */
+ void open() throws Exception;
+
+ /**
+ * Sets field in the hash stored at key to value.
+ * If key does not exist, a new key holding a hash is created.
+ * If field already exists in the hash, it is overwritten.
+ *
+ * @param key Hash name
+ * @param hashField Hash field
+ * @param value Hash value
+ */
+ void hset(String key, String hashField, String value);
+
+ /**
+ * Insert the specified value at the tail of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operation.
+ *
+ * @param listName Name of the List
+ * @param value Value to be added
+ */
+ void rpush(String listName, String value);
+
+ /**
+ * Insert the specified value at the head of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operation.
+ *
+ * @param listName Name of the List
+ * @param value Value to be added
+ */
+ void lpush(String listName, String value);
+
+ /**
+ * Add the specified member to the set stored at key.
+ * Specified members that are already a member of this set are ignored.
+ * If key does not exist, a new set is created before adding the specified members.
+ *
+ * @param setName Name of the Set
+ * @param value Value to be added
+ */
+ void sadd(String setName, String value);
+
+ /**
+ * Posts a message to the given channel.
+ *
+ * @param channelName Name of the channel to which data will be published
+ * @param message the message
+ */
+ void publish(String channelName, String message);
+
+ /**
+ * Set key to hold the string value. If key already holds a value, it is overwritten,
+ * regardless of its type. Any previous time to live associated with the key is
+ * discarded on successful SET operation.
+ *
+ * @param key the key name in which value to be set
+ * @param value the value
+ */
+ void set(String key, String value);
+
+ /**
+ * Adds all the element arguments to the HyperLogLog data structure
+ * stored at the variable name specified as first argument.
+ *
+ * @param key The name of the key
+ * @param element the element
+ */
+ void pfadd(String key, String element);
+
+ /**
+ * Adds the specified member with the specified scores to the sorted set stored at key.
+ *
+ * @param key The name of the Sorted Set
+ * @param score Score of the element
+ * @param element element to be added
+ */
+ void zadd(String key, String score, String element);
+
+ /**
+ * Close the Jedis container.
+ *
+ * @throws IOException if the instance can not be closed properly
+ */
+ void close() throws IOException;
+}