You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by subhankarb <gi...@git.apache.org> on 2016/02/03 04:28:57 UTC

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

GitHub user subhankarb opened a pull request:

    https://github.com/apache/flink/pull/1580

    [FLINK-3034][Streaming Connectors] Redis Sink Connector

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/subhankarb/flink FLINK-3034

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1580.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1580
    
----
commit 668bf80ca2a07a29310d62e7628de28d26ed09aa
Author: subhankar <su...@target.com>
Date:   2016-02-03T03:27:38Z

    [FLINK-3034][Streaming Connectors] Redis Sink Connector

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699024
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
    @@ -0,0 +1,86 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-connectors</artifactId>
    +        <version>1.0-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +
    +    <artifactId>flink-connector-redis_2.10</artifactId>
    +    <name>flink-connector-redis</name>
    +
    +    <packaging>jar</packaging>
    +
    +    <properties>
    +        <jedis.vaersion>2.8.0</jedis.vaersion>
    +    </properties>
    --- End diff --
    
    Typo: `version`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699764
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    --- End diff --
    
    JavaDoc missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-179221148
  
    Please *rebase*  on current master instead of merging the current master into your branch. See "Preparing and submitting your contribution" in https://flink.apache.org/contribute-code.html#code-contribution-process


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699436
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    --- End diff --
    
    Please rename `logger` to `LOG` -- this is the common name in Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r52015801
  
    --- Diff: docs/apis/streaming/connectors/redis.md ---
    @@ -0,0 +1,70 @@
    +---
    +title: "Redis Connector"
    +
    +# Sub-level navigation
    +sub-nav-group: streaming
    +sub-nav-parent: connectors
    +sub-nav-pos: 4
    --- End diff --
    
    rabbitmq.md has also sub-nav-pos 4 (I guess this will result in an conflict). did you build the documentation and checked it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by subhankarb <gi...@git.apache.org>.
Github user subhankarb commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-187546037
  
    hi matthias,
       i was on vacation for 1 week. I'll update the pr once i get some input
    from robert about my design.
    On 22-Feb-2016 7:21 PM, "Robert Metzger" <no...@github.com> wrote:
    
    > Sorry, I'm currently very busy with the 1.0 release. I hope I'll find some
    > time to look into this PR again later this week.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/1580#issuecomment-187184409>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-180362938
  
    Just a minor comment about the documentation. Otherwise, looks good to me.
    @rmetzger can you review this, too? (You are familiar with the connectors in general -- I doubt that we have a Redis expert as a commiter, do we?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699225
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    --- End diff --
    
    Missing JavaDoc.
    Remove double-whitespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699767
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName) {
    --- End diff --
    
    JavaDoc missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r52016008
  
    --- Diff: docs/apis/streaming/connectors/redis.md ---
    @@ -0,0 +1,70 @@
    +---
    +title: "Redis Connector"
    +
    +# Sub-level navigation
    +sub-nav-group: streaming
    +sub-nav-parent: connectors
    +sub-nav-pos: 4
    +sub-nav-title: Redis
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +This connector provides access to data streams from [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the following dependency to your project:
    +
    +{% highlight xml %}
    +<dependency>
    +  <groupId>org.apache.flink</groupId>
    +  <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
    +  <version>{{site.version }}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
    +
    +#### Installing Redis
    +Follow the instructions from the [Redis download page](http://redis.io/download).
    +
    +#### Redis Sink
    +A class providing an interface for sending data to Redis. It internally sends data to redis
    +channel using Redis PUBLISH command
    +
    +The followings have to be provided for the `RedisSink(…)` constructor in order:
    +
    +1. The hostname
    +2. The port number
    +3. The channel name
    +4. Serialization schema
    +
    +Example:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +stream.addSink(new RedisSink<String>("localhost", 6379, "hello", new SimpleStringSchema()));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +stream.addSink(new RedisSink[String]("localhost", 6379, "hello", new SimpleStringSchema))
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +More about Redis can be found [here](http://redis.io/).
    --- End diff --
    
    `[here](...)` is not the best style. I would go for: `More about Redis can be found on the [Redis web page](...).`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699060
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
    @@ -0,0 +1,86 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-connectors</artifactId>
    +        <version>1.0-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +
    +    <artifactId>flink-connector-redis_2.10</artifactId>
    +    <name>flink-connector-redis</name>
    +
    +    <packaging>jar</packaging>
    +
    +    <properties>
    +        <jedis.vaersion>2.8.0</jedis.vaersion>
    +    </properties>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>redis.clients</groupId>
    +            <artifactId>jedis</artifactId>
    +            <version>${jedis.vaersion}</version>
    --- End diff --
    
    version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51700184
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName) {
    +		this(host, port, channel, schema, timeOut, soTimeOut, password, database, clientName, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    +		this.host = host;
    +		this.port = port;
    +		this.channel = channel;
    +		this.schema = schema;
    +		this.timeOut = timeOut;
    +		this.soTimeOut = soTimeOut;
    +		this.password = password;
    +		this.database = database;
    +		this.clientName = clientName;
    +		this.poolConfig = poolConfig;
    +	}
    +
    +
    +	@Override
    +	public void invoke(IN value) throws Exception {
    +		try (Jedis jedis = jedisPool.getResource()) {
    +			byte[] msg = schema.serialize(value);
    +			jedis.publish(channel.getBytes(), msg);
    +		} catch (Exception e) {
    +			if (logger.isErrorEnabled()) {
    +				logger.error("Cannot send Redis message {}", channel);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		if (this.poolConfig == null){
    +			this.poolConfig = new JedisPoolConfig();
    +		}
    +		this.jedisPool = new JedisPool(poolConfig, host, port, timeOut, soTimeOut, password, database, clientName);
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		if (!jedisPool.isClosed()){
    +		try {
    --- End diff --
    
    indention


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by subhankarb <gi...@git.apache.org>.
Github user subhankarb commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-181340467
  
    @rmetzger sorry for late reply. 
    plz take a look at the gist https://gist.github.com/subhankarb/6a503378063819eb47e9
    user can choose which one they want to use at the time of creating sink and source[while source can contain only PUBSUB(subscribe) and list(pop)].



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699774
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName) {
    +		this(host, port, channel, schema, timeOut, soTimeOut, password, database, clientName, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    --- End diff --
    
    JavaDoc missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by subhankarb <gi...@git.apache.org>.
Github user subhankarb commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r52162481
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import redis.clients.jedis.BinaryJedisPubSub;
    +import redis.clients.jedis.JedisPool;
    +import redis.embedded.RedisServer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.junit.Before;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class RedisSinkTest extends StreamingMultipleProgramsTestBase {
    +
    +	private static final int NUM_ELEMENTS = 20;
    +	private static final int REDIS_PORT = 6379;
    --- End diff --
    
    thanks. fined this with NetUtils.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-184216894
  
    Any progress here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51699856
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName) {
    +		this(host, port, channel, schema, timeOut, soTimeOut, password, database, clientName, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    +		this.host = host;
    --- End diff --
    
    Add checks for `null` and other invalid parameter values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-180378937
  
    Actually, I worked a bit with redis recently ;) I'll take a look at the PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51717544
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final long serialVersionUID = 1L;
    +	private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +	/**
    +	 * Creates a new RedisSink. For passing custom connection Pool config, please use the constructor
    +	 * {@link RedisSink#RedisSink(String, int, String, SerializationSchema, int, int, String, int, String,
    +	 * JedisPoolConfig poolConfig)},
    +	 * @param host Redis Host name to connect to
    +	 * @param port Redis instance Port
    +	 * @param channel The channel to which data will be published
    +	 * @param schema A {@link SerializationSchema} for turning the java object to bytes.
    +	 */
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null, null);
    +	}
    +
    +	/**
    +	 * Creates a new RedisSink.
    +	 * @param host Redis Host name to connect to
    +	 * @param port Redis instance Port
    +	 * @param channel The channel to which data will be published
    +	 * @param schema A {@link SerializationSchema} for turning the java object to bytes.
    +	 * @param timeOut Connection Timeout in millisecond
    +	 * @param soTimeOut Socket Timeout in millisecond. Sets Socket::SO_TIMEOUT
    +	 * @param password Password for Redis Server
    +	 * @param database Select the DB with having the specified zero-based numeric index
    +	 * @param clientName Assigns a name to the current connection using CLIENT SETNAME command
    +	 * @param poolConfig Custom jedis connection pool configuration
    +	 */
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut,
    +					int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    +		this.host = host;
    +		this.port = port;
    +		this.channel = channel;
    +		this.schema = schema;
    +		this.timeOut = timeOut;
    +		this.soTimeOut = soTimeOut;
    +		this.password = password;
    +		this.database = database;
    +		this.clientName = clientName;
    +		this.poolConfig = poolConfig;
    +	}
    +
    +	/**
    +	 * Called when new data arrives to the sink, and forwards it to Redis channel.
    +	 *
    +	 * @param value The incoming data
    +	 */
    +	@Override
    +	public void invoke(IN value) throws Exception {
    +		try (Jedis jedis = jedisPool.getResource()) {
    +			byte[] msg = schema.serialize(value);
    +			jedis.publish(channel.getBytes(), msg);
    +		} catch (Exception e) {
    +			if (LOG.isErrorEnabled()) {
    +				LOG.error("Cannot send Redis message {}", channel);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +
    +		Preconditions.checkNotNull(this.host, "Redis host name should not be Null");
    --- End diff --
    
    Why do you check here and not in constructor? Checking in constructor will give error on client side, ie, the error is detected earlier. The check here, would be done in the TaskManager (ie, on the cluster). This is way harder to debug for a user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by subhankarb <gi...@git.apache.org>.
Github user subhankarb closed the pull request at:

    https://github.com/apache/flink/pull/1580


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by subhankarb <gi...@git.apache.org>.
Github user subhankarb commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-180679993
  
    @rmetzger when i started i had only pubsub in my mind. today i thought about added list, sorted set for sink+source and hash and set for sink only. i'll update the pull req.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51700142
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger logger = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName) {
    +		this(host, port, channel, schema, timeOut, soTimeOut, password, database, clientName, null);
    +	}
    +
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut, int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    +		this.host = host;
    +		this.port = port;
    +		this.channel = channel;
    +		this.schema = schema;
    +		this.timeOut = timeOut;
    +		this.soTimeOut = soTimeOut;
    +		this.password = password;
    +		this.database = database;
    +		this.clientName = clientName;
    +		this.poolConfig = poolConfig;
    +	}
    +
    +
    +	@Override
    +	public void invoke(IN value) throws Exception {
    +		try (Jedis jedis = jedisPool.getResource()) {
    +			byte[] msg = schema.serialize(value);
    +			jedis.publish(channel.getBytes(), msg);
    +		} catch (Exception e) {
    +			if (logger.isErrorEnabled()) {
    +				logger.error("Cannot send Redis message {}", channel);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		if (this.poolConfig == null){
    +			this.poolConfig = new JedisPoolConfig();
    +		}
    +		this.jedisPool = new JedisPool(poolConfig, host, port, timeOut, soTimeOut, password, database, clientName);
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		if (!jedisPool.isClosed()){
    --- End diff --
    
    `null` check for `jedisPool` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r52022185
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +
    +/**
    + * A sink that delivers data to a Redis channel using the Jedis client.
    + *
    + *  <p>
    + * When creating the sink {@code host, port, schema} must be specified or else it will throw
    + * {@link NullPointerException}
    + *
    + *  * <p>
    + * Example:
    + *
    + * <pre>{@code
    + *     new RedisSink<String>(host, port, schema)
    + * }</pre>
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public class RedisSink<IN> extends RichSinkFunction<IN> {
    +
    +	private static final long serialVersionUID = 1L;
    +	private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +	/**
    +	 * Creates a new RedisSink. For passing custom connection Pool config, please use the constructor
    +	 * {@link RedisSink#RedisSink(String, int, String, SerializationSchema, int, int, String, int, String,
    +	 * JedisPoolConfig poolConfig)},
    +	 * @param host Redis Host name to connect to
    +	 * @param port Redis instance Port
    +	 * @param channel The channel to which data will be published
    +	 * @param schema A {@link SerializationSchema} for turning the java object to bytes.
    +	 */
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null, null);
    +	}
    +
    +	/**
    +	 * Creates a new RedisSink.
    +	 * @param host Redis Host name to connect to
    +	 * @param port Redis instance Port
    +	 * @param channel The channel to which data will be published
    +	 * @param schema A {@link SerializationSchema} for turning the java object to bytes.
    +	 * @param timeOut Connection Timeout in millisecond
    +	 * @param soTimeOut Socket Timeout in millisecond. Sets Socket::SO_TIMEOUT
    +	 * @param password Password for Redis Server
    +	 * @param database Select the DB with having the specified zero-based numeric index
    +	 * @param clientName Assigns a name to the current connection using CLIENT SETNAME command
    +	 * @param poolConfig Custom jedis connection pool configuration
    +	 */
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut,
    +					int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    +
    +		Preconditions.checkNotNull(host, "Redis host name should not be Null");
    +		Preconditions.checkNotNull(channel, "Redis Channel name can not be null");
    +		Preconditions.checkNotNull(schema, "SerializationSchema should not be Null");
    +
    +		this.host = host;
    +		this.port = port;
    +		this.channel = channel;
    +		this.schema = schema;
    +		this.timeOut = timeOut;
    +		this.soTimeOut = soTimeOut;
    +		this.password = password;
    +		this.database = database;
    +		this.clientName = clientName;
    +		this.poolConfig = poolConfig;
    +	}
    +
    +	/**
    +	 * Called when new data arrives to the sink, and forwards it to Redis channel.
    +	 *
    +	 * @param value The incoming data
    +	 */
    +	@Override
    +	public void invoke(IN value) throws Exception {
    +		try (Jedis jedis = jedisPool.getResource()) {
    +			byte[] msg = schema.serialize(value);
    +			jedis.publish(channel.getBytes(), msg);
    +		} catch (Exception e) {
    +			if (LOG.isErrorEnabled()) {
    +				LOG.error("Cannot send Redis message {}", channel);
    --- End diff --
    
    You should log the exception as well!
    
    It would be helpful to add a flag which controls the error behavior. Some users want their system to fail with an exception if its unable to deliver a message (Flink's fault tolerance would restart the job and retry!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r52023113
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +
    +/**
    + * A sink that delivers data to a Redis channel using the Jedis client.
    + *
    + *  <p>
    + * When creating the sink {@code host, port, schema} must be specified or else it will throw
    + * {@link NullPointerException}
    + *
    + *  * <p>
    + * Example:
    + *
    + * <pre>{@code
    + *     new RedisSink<String>(host, port, schema)
    + * }</pre>
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public class RedisSink<IN> extends RichSinkFunction<IN> {
    +
    +	private static final long serialVersionUID = 1L;
    +	private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
    +
    +	private static final int DEFAULT_TIMEOUT = 2000;
    +
    +	private String host;
    +	private int port;
    +	private String channel;
    +	private int timeOut;
    +	private int soTimeOut;
    +	private int database;
    +	private String password;
    +	private String clientName;
    +
    +	private SerializationSchema<IN> schema;
    +	private transient JedisPoolConfig poolConfig;
    +	private transient JedisPool jedisPool;
    +
    +	/**
    +	 * Creates a new RedisSink. For passing custom connection Pool config, please use the constructor
    +	 * {@link RedisSink#RedisSink(String, int, String, SerializationSchema, int, int, String, int, String,
    +	 * JedisPoolConfig poolConfig)},
    +	 * @param host Redis Host name to connect to
    +	 * @param port Redis instance Port
    +	 * @param channel The channel to which data will be published
    +	 * @param schema A {@link SerializationSchema} for turning the java object to bytes.
    +	 */
    +	public RedisSink(String host, int port,  String channel, SerializationSchema<IN> schema) {
    +		this(host, port, channel, schema, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null, null);
    +	}
    +
    +	/**
    +	 * Creates a new RedisSink.
    +	 * @param host Redis Host name to connect to
    +	 * @param port Redis instance Port
    +	 * @param channel The channel to which data will be published
    +	 * @param schema A {@link SerializationSchema} for turning the java object to bytes.
    +	 * @param timeOut Connection Timeout in millisecond
    +	 * @param soTimeOut Socket Timeout in millisecond. Sets Socket::SO_TIMEOUT
    +	 * @param password Password for Redis Server
    +	 * @param database Select the DB with having the specified zero-based numeric index
    +	 * @param clientName Assigns a name to the current connection using CLIENT SETNAME command
    +	 * @param poolConfig Custom jedis connection pool configuration
    +	 */
    +	public RedisSink(String host, int port, String channel, SerializationSchema<IN> schema, int timeOut,
    +					int soTimeOut, String password, int database, String clientName, JedisPoolConfig poolConfig) {
    +
    +		Preconditions.checkNotNull(host, "Redis host name should not be Null");
    +		Preconditions.checkNotNull(channel, "Redis Channel name can not be null");
    +		Preconditions.checkNotNull(schema, "SerializationSchema should not be Null");
    +
    +		this.host = host;
    +		this.port = port;
    +		this.channel = channel;
    +		this.schema = schema;
    +		this.timeOut = timeOut;
    +		this.soTimeOut = soTimeOut;
    +		this.password = password;
    +		this.database = database;
    +		this.clientName = clientName;
    +		this.poolConfig = poolConfig;
    +	}
    +
    +	/**
    +	 * Called when new data arrives to the sink, and forwards it to Redis channel.
    +	 *
    +	 * @param value The incoming data
    +	 */
    +	@Override
    +	public void invoke(IN value) throws Exception {
    +		try (Jedis jedis = jedisPool.getResource()) {
    +			byte[] msg = schema.serialize(value);
    +			jedis.publish(channel.getBytes(), msg);
    +		} catch (Exception e) {
    +			if (LOG.isErrorEnabled()) {
    +				LOG.error("Cannot send Redis message {}", channel);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +
    +		if (this.poolConfig == null){
    +			this.poolConfig = new JedisPoolConfig();
    +		}
    +		if (this.jedisPool == null){
    +			this.jedisPool = new JedisPool(poolConfig, host, port, timeOut, soTimeOut, password, database, clientName);
    --- End diff --
    
    So this means that every parallel Redis Sink instance in Flink will keep a pool of Jedis connections?
    
    If a user is running Flink on a TaskManager with 8 slots, and each of the slots has 8 jedis connections (8 is the default number of pooled instances), we have 64 connections to redis from one machine.
    
    I wonder if it would make sense to allow the user to change the pool size (if it becomes an issue, they can set the size to 1) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-180385898
  
    Thanks a lot for working on this!
    
    I like the pull request! 
    Just out of curiosity, did you test the throughput you can get with the connector?
    
    This sink is only geared towards redis publish/subscribe system. What about usecases when users want to put data into redis KV store, or into a hashmap for a key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r51717701
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import redis.clients.jedis.Jedis;
    +import redis.clients.jedis.JedisPool;
    +import redis.clients.jedis.JedisPoolConfig;
    +import redis.clients.jedis.Protocol;
    +import redis.clients.jedis.exceptions.JedisException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +
    +public class RedisSink<IN>  extends RichSinkFunction<IN> {
    --- End diff --
    
    JavaDoc missing
    eliminate double blank after `RedisSink<IN>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-187184409
  
    Sorry, I'm currently very busy with the 1.0 release. I hope I'll find some time to look into this PR again later this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-179257332
  
    Please add documentation for the web page. For details, see here: https://github.com/apache/flink/tree/master/docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1580#issuecomment-180746275
  
    How do you want to implement this? I'm asking because I don't see an obvious way to expose this to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1580#discussion_r52023695
  
    --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.redis;
    +
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import redis.clients.jedis.BinaryJedisPubSub;
    +import redis.clients.jedis.JedisPool;
    +import redis.embedded.RedisServer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.junit.Before;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class RedisSinkTest extends StreamingMultipleProgramsTestBase {
    +
    +	private static final int NUM_ELEMENTS = 20;
    +	private static final int REDIS_PORT = 6379;
    --- End diff --
    
    I'm not sure if hardcoding a port is a good idea. The system executing the test might have that port blocked.
    
    I think our `NetUtils` have a method for finding an available port. Or maybe you can alloc the server on port 0 and let the OS choose?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---