You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:13 UTC
[23/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..dc5396a
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+ /**
+ * Initialize the {@link RedisCommandsContainer} based on the instance type.
+ * @param flinkJedisConfigBase configuration base
+ * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+ */
+ public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
+ if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
+ FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+ } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
+ FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+ } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
+ FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+ } else {
+ throw new IllegalArgumentException("Jedis configuration not found");
+ }
+ }
+
+ /**
+ * Builds container for single Redis environment.
+ *
+ * @param jedisPoolConfig configuration for JedisPool
+ * @return container for single Redis environment
+ * @throws NullPointerException if jedisPoolConfig is null
+ */
+ public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
+ Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+ JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
+ jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
+ return new RedisContainer(jedisPool);
+ }
+
+ /**
+ * Builds container for Redis Cluster environment.
+ *
+ * @param jedisClusterConfig configuration for JedisCluster
+ * @return container for Redis Cluster environment
+ * @throws NullPointerException if jedisClusterConfig is null
+ */
+ public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
+ Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
+
+ JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+ return new RedisClusterContainer(jedisCluster);
+ }
+
+ /**
+ * Builds container for Redis Sentinel environment.
+ *
+ * @param jedisSentinelConfig configuration for JedisSentinel
+ * @return container for Redis sentinel environment
+ * @throws NullPointerException if jedisSentinelConfig is null
+ */
+ public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
+ Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
+
+ JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+ jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+ jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+ jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+ return new RedisContainer(jedisSentinelPool);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
new file mode 100644
index 0000000..ba4bbda
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
+
+ private transient JedisPool jedisPool;
+ private transient JedisSentinelPool jedisSentinelPool;
+
+ /**
+ * Use this constructor if to connect with single Redis server.
+ *
+ * @param jedisPool JedisPool which actually manages Jedis instances
+ */
+ public RedisContainer(JedisPool jedisPool) {
+ Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null");
+ this.jedisPool = jedisPool;
+ this.jedisSentinelPool = null;
+ }
+
+ /**
+ * Use this constructor if Redis environment is clustered with sentinels.
+ *
+ * @param sentinelPool SentinelPool which actually manages Jedis instances
+ */
+ public RedisContainer(final JedisSentinelPool sentinelPool) {
+ Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null");
+ this.jedisPool = null;
+ this.jedisSentinelPool = sentinelPool;
+ }
+
+ /**
+ * Closes the Jedis instances.
+ */
+ @Override
+ public void close() throws IOException {
+ if (this.jedisPool != null) {
+ this.jedisPool.close();
+ }
+ if (this.jedisSentinelPool != null) {
+ this.jedisSentinelPool.close();
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+
+ // echo() tries to open a connection and echos back the
+ // message passed as argument. Here we use it to monitor
+ // if we can communicate with the cluster.
+
+ getInstance().echo("Test");
+ }
+
+ @Override
+ public void hset(final String key, final String hashField, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.hset(key, hashField, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void rpush(final String listName, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.rpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void lpush(String listName, String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.lpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command LUSH to list {} error message {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void sadd(final String setName, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.sadd(setName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+ setName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void publish(final String channelName, final String message) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.publish(channelName, message);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+ channelName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void set(final String key, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.set(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void pfadd(final String key, final String element) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.pfadd(key, element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void zadd(final String key, final String score, final String element) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.zadd(key, Double.valueOf(score), element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ /**
+ * Returns Jedis instance from the pool.
+ *
+ * @return the Jedis instance
+ */
+ private Jedis getInstance() {
+ if (jedisSentinelPool != null) {
+ return jedisSentinelPool.getResource();
+ } else {
+ return jedisPool.getResource();
+ }
+ }
+
+ /**
+ * Closes the jedis instance after finishing the command.
+ *
+ * @param jedis The jedis instance
+ */
+ private void releaseInstance(final Jedis jedis) {
+ if (jedis == null) {
+ return;
+ }
+ try {
+ jedis.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close (return) instance to pool", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
new file mode 100644
index 0000000..b0661c7
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link RedisDataType} group.
+ */
+public enum RedisCommand {
+
+ /**
+ * Insert the specified value at the head of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operations.
+ */
+ LPUSH(RedisDataType.LIST),
+
+ /**
+ * Insert the specified value at the tail of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operation.
+ */
+ RPUSH(RedisDataType.LIST),
+
+ /**
+ * Add the specified member to the set stored at key.
+ * Specified member that is already a member of this set is ignored.
+ */
+ SADD(RedisDataType.SET),
+
+ /**
+ * Set key to hold the string value. If key already holds a value,
+ * it is overwritten, regardless of its type.
+ */
+ SET(RedisDataType.STRING),
+
+ /**
+ * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
+ */
+ PFADD(RedisDataType.HYPER_LOG_LOG),
+
+ /**
+ * Posts a message to the given channel.
+ */
+ PUBLISH(RedisDataType.PUBSUB),
+
+ /**
+ * Adds the specified members with the specified score to the sorted set stored at key.
+ */
+ ZADD(RedisDataType.SORTED_SET),
+
+ /**
+ * Sets field in the hash stored at key to value. If key does not exist,
+ * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
+ */
+ HSET(RedisDataType.HASH);
+
+ /**
+ * The {@link RedisDataType} this command belongs to.
+ */
+ private RedisDataType redisDataType;
+
+ RedisCommand(RedisDataType redisDataType) {
+ this.redisDataType = redisDataType;
+ }
+
+
+ /**
+ * The {@link RedisDataType} this command belongs to.
+ * @return the {@link RedisDataType}
+ */
+ public RedisDataType getRedisDataType(){
+ return redisDataType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 0000000..1eea48a
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * The description of the command type. This must be passed while creating new {@link RedisMapper}.
+ * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET},
+ * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
+ * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
+ * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
+ */
+public class RedisCommandDescription implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private RedisCommand redisCommand;
+
+ /**
+ * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+ * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+ * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+ * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
+ * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+ * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
+ */
+ private String additionalKey;
+
+ /**
+ * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+ * If different data type is specified, {@code additionalKey} is ignored.
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash and Sorted set data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+ Preconditions.checkNotNull(redisCommand, "Redis command type can not be null");
+ this.redisCommand = redisCommand;
+ this.additionalKey = additionalKey;
+
+ if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
+ redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
+ if (additionalKey == null) {
+ throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+ }
+ }
+ }
+
+ /**
+ * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+ *
+ * @param redisCommand the redis data type {@link RedisCommand}
+ */
+ public RedisCommandDescription(RedisCommand redisCommand) {
+ this(redisCommand, null);
+ }
+
+ /**
+ * Returns the {@link RedisCommand}.
+ *
+ * @return the command type of the mapping
+ */
+ public RedisCommand getCommand() {
+ return redisCommand;
+ }
+
+ /**
+ * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+ *
+ * @return the additional key
+ */
+ public String getAdditionalKey() {
+ return additionalKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
new file mode 100644
index 0000000..6e3997c
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis.
+ */
+public enum RedisDataType {
+
+ /**
+ * Strings are the most basic kind of Redis value. Redis Strings are binary safe,
+ * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object.
+ * A String value can be at max 512 Megabytes in length.
+ */
+ STRING,
+
+ /**
+ * Redis Hashes are maps between string fields and string values.
+ */
+ HASH,
+
+ /**
+ * Redis Lists are simply lists of strings, sorted by insertion order.
+ */
+ LIST,
+
+ /**
+ * Redis Sets are an unordered collection of Strings.
+ */
+ SET,
+
+ /**
+ * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings.
+ * The difference is that every member of a Sorted Set is associated with score,
+ * that is used in order to take the sorted set ordered, from the smallest to the greatest score.
+ * While members are unique, scores may be repeated.
+ */
+ SORTED_SET,
+
+ /**
+ * HyperLogLog is a probabilistic data structure used in order to count unique things.
+ */
+ HYPER_LOG_LOG,
+
+ /**
+ * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels,
+ * without knowledge of what (if any) subscribers there may be.
+ * Subscribers express interest in one or more channels, and only receive messages
+ * that are of interest, without knowledge of what (if any) publishers there are.
+ */
+ PUBSUB
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..63fed19
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be mapped to redis type.
+ *<p>Example:
+ *<pre>{@code
+ *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
+ * public RedisDataTypeDescription getCommandDescription() {
+ * return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ * }
+ * public String getKeyFromData(Tuple2<String, String> data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2<String, String> data) {
+ * return data.f1;
+ * }
+ *}
+ *}</pre>
+ *
+ * @param <T> The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+ /**
+ * Returns descriptor which defines data type.
+ *
+ * @return data type descriptor
+ */
+ RedisCommandDescription getCommandDescription();
+
+ /**
+ * Extracts key from data.
+ *
+ * @param data source data
+ * @return key
+ */
+ String getKeyFromData(T data);
+
+ /**
+ * Extracts value from data.
+ *
+ * @param data source data
+ * @return value
+ */
+ String getValueFromData(T data);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..7d98f2d
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase {
+
+ public static final int REDIS_PORT = getAvailablePort();
+ public static final String REDIS_HOST = "127.0.0.1";
+
+ private static RedisServer redisServer;
+
+ @BeforeClass
+ public static void createRedisServer() throws IOException, InterruptedException {
+ redisServer = new RedisServer(REDIS_PORT);
+ redisServer.start();
+ }
+
+ @AfterClass
+ public static void stopRedisServer(){
+ redisServer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
new file mode 100644
index 0000000..dc59ba4
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest extends TestLogger {
+
+ private static RedisCluster cluster;
+ private static final String REDIS_MASTER = "master";
+ private static final String TEST_KEY = "testKey";
+ private static final String TEST_VALUE = "testValue";
+ private static final List<Integer> sentinels = Arrays.asList(getAvailablePort(), getAvailablePort());
+ private static final List<Integer> group1 = Arrays.asList(getAvailablePort(), getAvailablePort());
+
+ private JedisSentinelPool jedisSentinelPool;
+ private FlinkJedisSentinelConfig jedisSentinelConfig;
+
+ @BeforeClass
+ public static void setUpCluster(){
+ cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+ .serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+ .build();
+ cluster.start();
+ }
+
+ @Before
+ public void setUp() {
+ Set<String> hosts = JedisUtil.sentinelHosts(cluster);
+ jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+ .setSentinels(hosts).build();
+ jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+ jedisSentinelConfig.getSentinels());
+ }
+
+ @Test
+ public void testRedisSentinelOperation() {
+ RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+ Jedis jedis = null;
+ try{
+ jedis = jedisSentinelPool.getResource();
+ redisContainer.set(TEST_KEY, TEST_VALUE);
+ assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
+ }finally {
+ if (jedis != null){
+ jedis.close();
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (jedisSentinelPool != null) {
+ jedisSentinelPool.close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownCluster() throws IOException {
+ if (!cluster.isActive()) {
+ cluster.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
new file mode 100644
index 0000000..21f3cca
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+ private FlinkJedisPoolConfig jedisPoolConfig;
+ private static final Long NUM_ELEMENTS = 20L;
+ private static final String REDIS_KEY = "TEST_KEY";
+ private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+
+ StreamExecutionEnvironment env;
+
+
+ private Jedis jedis;
+
+ @Before
+ public void setUp(){
+ jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+ .setHost(REDIS_HOST)
+ .setPort(REDIS_PORT).build();
+ jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ @Test
+ public void testRedisListDataType() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisCommandMapper(RedisCommand.LPUSH));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis List Data Type");
+
+ assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+ jedis.del(REDIS_KEY);
+ }
+
+ @Test
+ public void testRedisSetDataType() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisCommandMapper(RedisCommand.SADD));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Set Data Type");
+
+ assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+ jedis.del(REDIS_KEY);
+ }
+
+ @Test
+ public void testRedisHyperLogLogDataType() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisCommandMapper(RedisCommand.PFADD));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Hyper Log Log Data Type");
+
+ assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+ jedis.del(REDIS_KEY);
+ }
+
+ @Test
+ public void testRedisSortedSetDataType() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalDataMapper(RedisCommand.ZADD));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Sorted Set Data Type");
+
+ assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
+ jedis.del(REDIS_ADDITIONAL_KEY);
+ }
+
+ @Test
+ public void testRedisHashDataType() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalDataMapper(RedisCommand.HSET));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Hash Data Type");
+
+ assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+
+ jedis.del(REDIS_ADDITIONAL_KEY);
+ }
+
+ @After
+ public void tearDown(){
+ if(jedis != null){
+ jedis.close();
+ }
+ }
+
+ private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+ for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+ for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ ctx.collect(new Tuple2<>("" + i, "message #" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+ for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ ctx.collect(new Tuple2<>( "message #" + i, "" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>>{
+
+ private RedisCommand redisCommand;
+
+ public RedisCommandMapper(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand);
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+
+ public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>>{
+
+ private RedisCommand redisCommand;
+
+ public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
new file mode 100644
index 0000000..caf3945
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishITCase extends RedisITCaseBase {
+
+ private static final int NUM_ELEMENTS = 20;
+ private static final String REDIS_CHANNEL = "CHANNEL";
+
+ private static final List<String> sourceList = new ArrayList<>();
+ private Thread sinkThread;
+ private PubSub pubSub;
+
+ @Before
+ public void before() throws Exception {
+ pubSub = new PubSub();
+ sinkThread = new Thread(new Subscribe(pubSub));
+ }
+
+ @Test
+ public void redisSinkTest() throws Exception {
+ sinkThread.start();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+ .setHost(REDIS_HOST)
+ .setPort(REDIS_PORT).build();
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+ source.addSink(redisSink);
+
+ env.execute("Redis Sink Test");
+
+ assertEquals(NUM_ELEMENTS, sourceList.size());
+ }
+
+ @After
+ public void after() throws Exception {
+ pubSub.unsubscribe();
+ sinkThread.join();
+ sourceList.clear();
+ }
+
+ private class Subscribe implements Runnable {
+ private PubSub localPubSub;
+ private Subscribe(PubSub pubSub){
+ this.localPubSub = pubSub;
+ }
+
+ @Override
+ public void run() {
+ JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+ pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+ }
+ }
+
+ private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+ for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ public static class PubSub extends JedisPubSub {
+
+ @Override
+ public void onMessage(String channel, String message) {
+ sourceList.add(message);
+ }
+
+ }
+
+ private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>>{
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(RedisCommand.PUBLISH);
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
new file mode 100644
index 0000000..59f59f2
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+
+public class RedisSinkTest extends TestLogger {
+
+ @Test(expected=NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
+ new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
+ new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
+ new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
+ }
+
+ @Test
+ public void testRedisDownBehavior() throws Exception {
+
+ // create a wrong configuration so that open() fails.
+
+ FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+ .setHost("127.0.0.1")
+ .setPort(1234).build();
+
+ testDownBehavior(wrongJedisPoolConfig);
+ }
+
+ @Test
+ public void testRedisClusterDownBehavior() throws Exception {
+
+ Set<InetSocketAddress> hosts = new HashSet<>();
+ hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+ // create a wrong configuration so that open() fails.
+
+ FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+ .setNodes(hosts)
+ .setTimeout(100)
+ .setMaxIdle(1)
+ .setMaxTotal(1)
+ .setMinIdle(1).build();
+
+ testDownBehavior(wrongJedisClusterConfig);
+ }
+
+ @Test
+ public void testRedisSentinelDownBehavior() throws Exception {
+
+ Set<String> hosts = new HashSet<>();
+ hosts.add("localhost:55095");
+
+ // create a wrong configuration so that open() fails.
+
+ FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
+ .setMasterName("master")
+ .setSentinels(hosts)
+ .build();
+
+ testDownBehavior(wrongJedisSentinelConfig);
+ }
+
+ private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
+ new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+ try {
+ redisSink.open(new Configuration());
+ } catch (Exception e) {
+
+ // search for nested JedisConnectionExceptions
+ // because this is the expected behavior
+
+ Throwable t = e;
+ int depth = 0;
+ while (!(t instanceof JedisConnectionException)) {
+ t = t.getCause();
+ if (t == null || depth++ == 20) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private class TestMapper implements RedisMapper<Tuple2<String, String>>{
+ private RedisCommandDescription redisCommandDescription;
+
+ public TestMapper(RedisCommandDescription redisCommandDescription){
+ this.redisCommandDescription = redisCommandDescription;
+ }
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return redisCommandDescription;
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
new file mode 100644
index 0000000..ed1d713
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest extends TestLogger {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
+ new TestConfig(-1, 0, 0, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
+ new TestConfig(1, -1, 0, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
+ new TestConfig(0, 0, -1, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
+ new TestConfig(0, 0, 0, -1);
+ }
+
+ private class TestConfig extends FlinkJedisConfigBase{
+
+ protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
new file mode 100644
index 0000000..40db578
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+ FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+ builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
+ Set<InetSocketAddress> set = new HashSet<>();
+ FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+ builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
new file mode 100644
index 0000000..dc16cfe
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisPoolConfigTest extends TestLogger {
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfHostValueIsNull(){
+ FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
+ builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
new file mode 100644
index 0000000..8445fae
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest extends TestLogger {
+
+ public static final String MASTER_NAME = "test-master";
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
+ FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+ Set<String> sentinels = new HashSet<>();
+ sentinels.add("127.0.0.1");
+ builder.setSentinels(sentinels).build();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
+ FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+ builder.setMasterName(MASTER_NAME).build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
+ FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+ Set<String> sentinels = new HashSet<>();
+ builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
new file mode 100644
index 0000000..b0eee48
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest extends TestLogger {
+
+ @Test(expected=IllegalArgumentException.class)
+ public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
+ RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
+ redisCommandMapper.getCommandDescription();
+ }
+
+ @Test
+ public void shouldReturnNullForAdditionalDataType(){
+ RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
+ RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription();
+ assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType());
+ assertNull(redisDataTypeDescription.getAdditionalKey());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-twitter/pom.xml b/flink-connectors/flink-connector-twitter/pom.xml
new file mode 100644
index 0000000..27a966f
--- /dev/null
+++ b/flink-connectors/flink-connector-twitter/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-twitter_2.10</artifactId>
+ <name>flink-connector-twitter</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <hbc-core.version>2.2.0</hbc-core.version>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>hbc-core</artifactId>
+ <version>${hbc-core.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <!-- We include all dependencies that transitively depend on guava -->
+ <include>com.twitter:hbc-core</include>
+ <include>com.twitter:joauth</include>
+ <include>org.apache.httpcomponents:httpclient</include>
+ <include>org.apache.httpcomponents:httpcore</include>
+ </includes>
+ </artifactSet>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..66fa237
--- /dev/null
+++ b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Properties;
+
+import com.twitter.hbc.common.DelimitedStreamReader;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. This is not a parallel source because the Twitter API only allows
+ * two concurrent connections.
+ */
+public class TwitterSource extends RichSourceFunction<String> implements StoppableFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
+
+ private static final long serialVersionUID = 1L;
+
+ // ----- Required property keys
+
+ public static final String CONSUMER_KEY = "twitter-source.consumerKey";
+
+ public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
+
+ public static final String TOKEN = "twitter-source.token";
+
+ public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
+
+ // ------ Optional property keys
+
+ public static final String CLIENT_NAME = "twitter-source.name";
+
+ public static final String CLIENT_HOSTS = "twitter-source.hosts";
+
+ public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";
+
+ // ----- Fields set by the constructor
+
+ private final Properties properties;
+
+ private EndpointInitializer initializer = new SampleStatusesEndpoint();
+
+ // ----- Runtime fields
+ private transient BasicClient client;
+ private transient Object waitLock;
+ private transient boolean running = true;
+
+
+ /**
+ * Create {@link TwitterSource} for streaming
+ *
+ * @param properties For the source
+ */
+ public TwitterSource(Properties properties) {
+ checkProperty(properties, CONSUMER_KEY);
+ checkProperty(properties, CONSUMER_SECRET);
+ checkProperty(properties, TOKEN);
+ checkProperty(properties, TOKEN_SECRET);
+
+ this.properties = properties;
+ }
+
+ private static void checkProperty(Properties p, String key) {
+ if(!p.containsKey(key)) {
+ throw new IllegalArgumentException("Required property '" + key + "' not set.");
+ }
+ }
+
+
+ /**
+ * Set a custom endpoint initializer.
+ */
+ public void setCustomEndpointInitializer(EndpointInitializer initializer) {
+ Objects.requireNonNull(initializer, "Initializer has to be set");
+ ClosureCleaner.ensureSerializable(initializer);
+ this.initializer = initializer;
+ }
+
+ // ----- Source lifecycle
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ waitLock = new Object();
+ }
+
+
+ @Override
+ public void run(final SourceContext<String> ctx) throws Exception {
+ LOG.info("Initializing Twitter Streaming API connection");
+
+ StreamingEndpoint endpoint = initializer.createEndpoint();
+
+ Authentication auth = new OAuth1(properties.getProperty(CONSUMER_KEY),
+ properties.getProperty(CONSUMER_SECRET),
+ properties.getProperty(TOKEN),
+ properties.getProperty(TOKEN_SECRET));
+
+ client = new ClientBuilder()
+ .name(properties.getProperty(CLIENT_NAME, "flink-twitter-source"))
+ .hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST))
+ .endpoint(endpoint)
+ .authentication(auth)
+ .processor(new HosebirdMessageProcessor() {
+ public DelimitedStreamReader reader;
+
+ @Override
+ public void setup(InputStream input) {
+ reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, Integer.parseInt(properties.getProperty(CLIENT_BUFFER_SIZE, "50000")));
+ }
+
+ @Override
+ public boolean process() throws IOException, InterruptedException {
+ String line = reader.readLine();
+ ctx.collect(line);
+ return true;
+ }
+ })
+ .build();
+
+ client.connect();
+ running = true;
+
+ LOG.info("Twitter Streaming API connection established successfully");
+
+ // just wait now
+ while(running) {
+ synchronized (waitLock) {
+ waitLock.wait(100L);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ this.running = false;
+ LOG.info("Closing source");
+ if (client != null) {
+ // client seems to be thread-safe
+ client.stop();
+ }
+ // leave main method
+ synchronized (waitLock) {
+ waitLock.notify();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Cancelling Twitter source");
+ close();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping Twitter source");
+ close();
+ }
+
+ // ------ Custom endpoints
+
+ /**
+ * Implementing this interface allows users of this source to set a custom endpoint.
+ */
+ public interface EndpointInitializer {
+ StreamingEndpoint createEndpoint();
+ }
+
+ /**
+ * Default endpoint initializer returning the {@see StatusesSampleEndpoint}.
+ */
+ private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable {
+ @Override
+ public StreamingEndpoint createEndpoint() {
+ // this default endpoint initializer returns the sample endpoint: Returning a sample from the firehose (all tweets)
+ StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+ endpoint.stallWarnings(false);
+ endpoint.delimited(false);
+ return endpoint;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml
new file mode 100644
index 0000000..5938560
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-hadoop-compatibility_2.10</artifactId>
+ <name>flink-hadoop-compatibility</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop2</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <!-- activate API compatibility checks -->
+ <plugin>
+ <groupId>com.github.siom79.japicmp</groupId>
+ <artifactId>japicmp-maven-plugin</artifactId>
+ </plugin>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Scala Code Style, most of the configuration done via plugin management -->
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <configuration>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
new file mode 100644
index 0000000..7bcb4bf
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
+ * interface defines the serialization and deserialization routines for the data type.
+ *
+ * @param <T> The type of the class represented by this type information.
+ */
+@Public
+public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> typeClass;
+
+ @PublicEvolving
+ public WritableTypeInfo(Class<T> typeClass) {
+ this.typeClass = checkNotNull(typeClass);
+
+ checkArgument(
+ Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
+ "WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ @PublicEvolving
+ public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+ if(Comparable.class.isAssignableFrom(typeClass)) {
+ return new WritableComparator(sortOrderAscending, typeClass);
+ }
+ else {
+ throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
+ "Class does not implement Comparable interface.");
+ }
+ }
+
+ @Override
+ @PublicEvolving
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ @PublicEvolving
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ @PublicEvolving
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ @PublicEvolving
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ @PublicEvolving
+ public Class<T> getTypeClass() {
+ return this.typeClass;
+ }
+
+ @Override
+ @PublicEvolving
+ public boolean isKeyType() {
+ return Comparable.class.isAssignableFrom(typeClass);
+ }
+
+ @Override
+ @PublicEvolving
+ public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
+ return new WritableSerializer<T>(typeClass);
+ }
+
+ @Override
+ public String toString() {
+ return "WritableType<" + typeClass.getName() + ">";
+ }
+
+ @Override
+ public int hashCode() {
+ return typeClass.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof WritableTypeInfo) {
+ @SuppressWarnings("unchecked")
+ WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
+
+ return writableTypeInfo.canEqual(this) &&
+ typeClass == writableTypeInfo.typeClass;
+
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof WritableTypeInfo;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @PublicEvolving
+ static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) {
+ if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) {
+ return new WritableTypeInfo<T>(typeClass);
+ }
+ else {
+ throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
+ }
+ }
+
+}