You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mans2singh <gi...@git.apache.org> on 2016/05/25 01:32:25 UTC

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

GitHub user mans2singh opened a pull request:

    https://github.com/apache/flink/pull/2031

    FLINK-3967 - Flink Sink for Rethink Db

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mans2singh/flink FLINK-3967

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2031
    
----
commit 98507a314587890f20b888a21df0b4b574f168a7
Author: mans2singh <ma...@yahoo.com>
Date:   2016-05-25T00:54:44Z

    FLINK-3967 - Flink Sink for Rethink Db

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2031#discussion_r64727151
  
    --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/pom.xml ---
    @@ -0,0 +1,100 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-streaming-connectors</artifactId>
    +		<version>1.1-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-connector-rethinkdb_2.10</artifactId>
    +	<name>flink-connector-rethinkdb</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<!-- Allow users to pass custom connector versions -->
    +	<properties>
    +		<rethinkdb.version>2.3.0</rethinkdb.version>
    +	</properties>
    +
    +	<dependencies>
    +
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-streaming-java_2.10</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>${guava.version}</version>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +      			<type>test-jar</type>
    +            <scope>test</scope>
    +        </dependency>
    +	<dependency>
    +	    <groupId>com.rethinkdb</groupId>
    +	    <artifactId>rethinkdb-driver</artifactId>
    +	    <version>${rethinkdb.version}</version>
    +        </dependency>
    --- End diff --
    
    Hi @StephanEwen - The 2nd option mentioned above (https://github.com/npiv/rethink-java-driver), is available under Apache License 2.0.  Is that acceptable for the Flink contribution ?
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2031#discussion_r64720893
  
    --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/pom.xml ---
    @@ -0,0 +1,100 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-streaming-connectors</artifactId>
    +		<version>1.1-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-connector-rethinkdb_2.10</artifactId>
    +	<name>flink-connector-rethinkdb</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<!-- Allow users to pass custom connector versions -->
    +	<properties>
    +		<rethinkdb.version>2.3.0</rethinkdb.version>
    +	</properties>
    +
    +	<dependencies>
    +
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-streaming-java_2.10</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>${guava.version}</version>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +      			<type>test-jar</type>
    +            <scope>test</scope>
    +        </dependency>
    +	<dependency>
    +	    <groupId>com.rethinkdb</groupId>
    +	    <artifactId>rethinkdb-driver</artifactId>
    +	    <version>${rethinkdb.version}</version>
    +        </dependency>
    --- End diff --
    
    Hi @rmetzger 
    
    Thanks for pointing the license issue.  I wanted to get your advice on two other options:
    
    1. Can I contribute it as as other Flink contributions (under flink-contrib module) ?
    2. There is another java-driver available for RethinkDB (https://github.com/npiv/rethink-java-driver), available under Apache License 2.0.  If that is acceptable for a contribution to Flink, I can see if I can migrate the sink to that driver.  
    
    If you have any other recommendations, please let me know.
    
    Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2031#discussion_r64724883
  
    --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/pom.xml ---
    @@ -0,0 +1,100 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-streaming-connectors</artifactId>
    +		<version>1.1-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-connector-rethinkdb_2.10</artifactId>
    +	<name>flink-connector-rethinkdb</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<!-- Allow users to pass custom connector versions -->
    +	<properties>
    +		<rethinkdb.version>2.3.0</rethinkdb.version>
    +	</properties>
    +
    +	<dependencies>
    +
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-streaming-java_2.10</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>${guava.version}</version>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +      			<type>test-jar</type>
    +            <scope>test</scope>
    +        </dependency>
    +	<dependency>
    +	    <groupId>com.rethinkdb</groupId>
    +	    <artifactId>rethinkdb-driver</artifactId>
    +	    <version>${rethinkdb.version}</version>
    +        </dependency>
    --- End diff --
    
    Unfortunately, anything that ends up in the Flink git repository must be Apache License compatible, regardless of whether it is `flink-core` or `flink-contrib`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/flink/pull/2031#issuecomment-222030024
  
    Hi @rmetzger 
    
    Thanks for your advice/suggestions.  I will try to answer your questions below:
    
    Regarding motivation for the connector - I started working with Flink late last year and found it had some unique features (like streaming framework based on events from the grounds up and flexibility of windowing options).  I am working through some real-time data flow use cases, where these capabilities can stream line our processing pipelines. The integration with RethinkDB came into play because from the dev perspective it is schema less, can ingest streams/batch of data, has map/reduce functionality.  From the ops/scaling perspective it can be scaled/re-sharded in real-time without downtime. It can also provide change streams for a table, or a document.  IMHO, Flink and Rethink complement each other for scalable, stream processing/analytics use cases.  So, I thought it might be good to contribute back to the open source community and therefore the PR.
    
    Regarding guidelines for code-contribution - I did go through the document and I thought this PR would be in the same vein as the other streaming connectors (kafka, etc), and complementing them.  There was no new API, or change in interfaces, and the code base is pretty light weight because Flink/RethinkDB both make it easy to integrate. However, I did not realize that it would be considered to a new feature and required a design review but perhaps that was my oversight.
    
    In any case, I really appreciate, the time you and your team took to review the PR and advice me.  Please let me know what's your recommendation on how I should proceed.
    
    Thanks, Mans



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/2031#issuecomment-221497411
  
    All travis builds failed.
    
    For Java 8 several RethinkDB tests failed.
    
    For Java 7 we get an Unsupported Major/Minor version error. Does this Sink (or rather the driver) support Java 7?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2031: FLINK-3967 - Flink Sink for Rethink Db

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/flink/pull/2031
  
    Hi @rmetzger - 
    Thanks for the update.  
    Can you please let me know the steps for contributing to bahir ?  Can I just fork it and add another project to the root folder ?  
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/flink/pull/2031#issuecomment-221737718
  
    Hey @zentol - Thanks for your feedback. 
    -  I've renamed the sink as you had recommended.  
    - Currently the driver from rethinkdb supports java 8 only.  There is one supporting java 6 but it no longer being maintained.
    -  The current travis failures for jdk 7 are because of the version issue as you had mentioned and those for java 8 are due to failures in the yarn-tests module. 
    - I've not done any performance tests.  If you have any suggestions for throughput numbers that might be valuable, or if there is any other sink that I can use for setting up baseline test, please let me know.
    
    If you have any other advice/recommendation, please let me know.   
    
    Thanks again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2031: FLINK-3967 - Flink Sink for Rethink Db

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2031
  
    Hi @mans2singh !
    
    This looks like a good piece of work, and ReThinkDB is an interesting technology!
    An Apache-compatible licence is also good.
    
    We are facing a bit of a maintenance issue right now, though, with a large number of new projects. There are so many possible DBs and connectors, and it is tough for the community to maintain them all. For that reason, I would suggest to stage such contributions of new and interesting connectors.
    
    Could you put this into an standalone GitHub project for now? We would link this repository from the Apache Flink website and that way make it visible to the Flink community.
    When we see that we get frequent requests for a ReThinkDB, we would like to move this to the core Flink code base.
    
    Following this idea, we would end up with frequently used connectors in the core Flink repository, and a larger number of connectors in third party community repositories. I think that would help the stability of the project and all connectors.
    
    Stephan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2031#discussion_r64709407
  
    --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/pom.xml ---
    @@ -0,0 +1,100 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-streaming-connectors</artifactId>
    +		<version>1.1-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-connector-rethinkdb_2.10</artifactId>
    +	<name>flink-connector-rethinkdb</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<!-- Allow users to pass custom connector versions -->
    +	<properties>
    +		<rethinkdb.version>2.3.0</rethinkdb.version>
    +	</properties>
    +
    +	<dependencies>
    +
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-streaming-java_2.10</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>${guava.version}</version>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +      			<type>test-jar</type>
    +            <scope>test</scope>
    +        </dependency>
    +	<dependency>
    +	    <groupId>com.rethinkdb</groupId>
    +	    <artifactId>rethinkdb-driver</artifactId>
    +	    <version>${rethinkdb.version}</version>
    +        </dependency>
    --- End diff --
    
    Whats the license of this driver?
    Rethingdb seems to use the Gnu affero license (https://github.com/rethinkdb/rethinkdb/blob/next/COPYRIGHT), which is not compatible with the apache license http://www.apache.org/legal/resolved.html#category-x so we can not accept this contribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2031: FLINK-3967 - Flink Sink for Rethink Db

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/flink/pull/2031
  
    Hey @rmetzger, @StephanEwen, Flink Folks:  
    
    Just wanted to let you know that the rethinkdb java driver that I used for Flink integration is available under Apache License 2.0  (https://github.com/rethinkdb/rethinkdb/blob/next/drivers/COPYRIGHT).   
    
    I've checked travis build and the rethink connector is passing but the last failure was in the flnk-yarn-tests module.
    
    Please let me know if you have any advice/comments for me.
    
    Thanks for your time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/2031#issuecomment-221886697
  
    Hi @mans2singh,
    thanks a lot for this contribution. I wonder what's your motivation to implement a flink --> rethinkdb connector? I'm asking because the Flink community can not accept every contribution, in particular if its adding a large new component we need to maintain.
    Only if there are enough users and developers willing support a feature, we can accept the contribution.
    I'm a bit sorry to tell you this after you've implemented the code, on the other hand, our guidelines [1] clearly state that new features need to be discussed with the community first.
    
    The good news is that we have a special section for community maintained modules on our website [2]. Therefore, I suggest that you put the code into a separate github repository and open a pull request to add a link to that repository.
    The good thing about this approach is also, that we don't have to fix the license issue immediately.
    Also, it doesn't mean that I'm against this contribution at all. If we see many users using the module from your github repository, and there are people in the community willing to maintain it, we can also look into merging it.
    
    [1] http://flink.apache.org/contribute-code.html#before-you-start-coding
    [2] http://flink.apache.org/community.html#third-party-packages



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2031#discussion_r64527750
  
    --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/FlinkRethinkDbSink.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.rethinkdb;
    +
    +import java.io.Serializable;
    +import java.util.HashMap;
    +import java.util.Objects;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rethinkdb.RethinkDB;
    +import com.rethinkdb.gen.ast.Insert;
    +import com.rethinkdb.gen.ast.Table;
    +import com.rethinkdb.net.Connection;
    +
    +/**
    + * This class is the Flink sink for RethinkDB which is a tcp/JSON protocol based document
    + * oriented NoSQL database.
    + * 
    + * <p/>
    + * This sink provides two constuctors:
    + * <p/>
    + * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, String table, JSONSerializationSchema schema)}, and
    + * <p/>
    + * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, String table, JSONSerializationSchema schema, ConflictStrategy conflictStrategy)}
    + * <p/>
    + * 
    + * The parameter for the constructor are as follows:
    + * <p/>
    + * <ul>
    + * <li>hostname - the rethinkdb hostname</li>
    + * <li>hosport - the rethinkdb port for the driver to connect</li>
    + * <li>database - the rethinkdb database name to which the table belongs</li>
    + * <li>table - the rethinkdb table name where documents are inserted</li>
    + * <li>schema - the schema tranfromer that converts input to JSONObject, or JSONArray</li>
    + * <li>conflictStrategy - the conflict resolution strategy in case inserted document has id which exists in the db</li>
    + * </ul>
    + * <p/>
    + *
    + * The user can also set:
    + * <p/>
    + * <ul>
    + * <li>username - default is admin</li>
    + * <li>password - default is blank</li>
    + * </ul>
    + * <p/> with the {@link #setUsernameAndPassword(String, String)} method.
    + * <p/>
    + * <b>NOTE: If multiple documents are getting inserted (eg: using JSONArray), the sink 
    + * checks if there is an error entry in the result HashMap and throws a runtime exception if errors
    + * counts is not zero.  The exception message contains the results HashMap. 
    + * In case of multiple errors only the first error is noted in the result HashMap.
    + * </b>
    + * 
    + * @see {@link ConflictStrategy} for conflict resolution strategies
    + * 
    + * @param <OUT> a value that can be transformed into a {@link org.json.simple.JSONArray;} or {@link org.json.simple.JSONObject}
    + */
    +public class FlinkRethinkDbSink<OUT> extends RichSinkFunction<OUT> implements Serializable{
    +
    +	/**
    +	 * Serial version for the class
    +	 */
    +	private static final long serialVersionUID = -2135499016796158755L;
    +
    +	/**
    +	 * Logger for the class
    +	 */
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkRethinkDbSink.class);
    +
    +	/**
    +	 * Conflict resolution option key in case document ids are same 
    +	 */
    +	public static final String CONFLICT_OPT = "conflict";
    +
    +	/**
    +	 * Result key indicating number of errors
    +	 */
    +	public static final String RESULT_ERROR_KEY = "errors";
    +
    +	/**
    +	 * Serialization schema for the sink
    +	 */
    +	private JSONSerializationSchema<OUT> serializationSchema;
    +
    +	/**
    +	 * RethinkDB connection object
    +	 */
    +	private transient Connection rethinkDbConnection;
    +
    +	/**
    +	 * RethinkDB hostname
    +	 */
    +	private String hostname;
    +
    +	/**
    +	 * RethinkDB port
    +	 */
    +	private int hostport;
    +
    +	/**
    +	 * RethinkDB tablename where documents are inserted
    +	 */
    +	private String tableName;
    +
    +	/**
    +	 * RethinkDB database where document are inserted
    +	 */
    +	private String databaseName;
    +
    +	/**
    +	 * Conflict resolution strategy
    +	 */
    +	private ConflictStrategy conflict;
    +	
    +	/**
    +	 * Default user name
    +	 */
    +	public static final String DEFAULT_USER_NAME = "admin";
    +	
    +	/**
    +	 * User name
    +	 */
    +	private String username = DEFAULT_USER_NAME;
    +	
    +	/**
    +	 * Default user name
    +	 */
    +	public static final String DEFAULT_PASSWORD = "";
    +	
    +	/**
    +	 * password
    +	 */
    +	private String password = DEFAULT_PASSWORD;
    +
    +	
    +	/**
    +	 * Constructor for RethinkDB sink
    +	 * @param hostname
    +	 * @param hostport
    +	 * @param database
    +	 * @param table
    +	 * @param schema
    +	 */
    +	public FlinkRethinkDbSink(String hostname, int hostport, String database, String table, 
    +			JSONSerializationSchema<OUT> schema) {
    +		this(hostname, hostport, database, table, schema, ConflictStrategy.update);
    +	}
    +
    +	/**
    +	 * Constructor for sink
    +	 * @param hostname
    +	 * @param hostport
    +	 * @param database name
    +	 * @param table name
    +	 * @param schema serialization converter
    +	 * @param conflict resolution strategy for document id conflict
    +	 */
    +	public FlinkRethinkDbSink(String hostname, int hostport, String database, String table, 
    +			JSONSerializationSchema<OUT> schema, 
    +			ConflictStrategy conflict) {
    +		this.hostname = Objects.requireNonNull(hostname);
    +		this.hostport = hostport;
    +		this.databaseName = Objects.requireNonNull(database);
    +		this.tableName = Objects.requireNonNull(table);
    +		this.serializationSchema = Objects.requireNonNull(schema);
    +		this.conflict = conflict;
    +	}
    +
    +	/**
    +	 * Open the sink
    +	 */
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		LOG.info("Received parameters : {}", parameters);
    +		
    +		super.open(parameters);
    +
    +		rethinkDbConnection = getRethinkDB().connection().hostname(hostname)
    +				.port(hostport).user(username, password).connect();
    +
    +		LOG.info("RethinkDb connection created for host {} port {} and db {}", 
    +				hostname, hostport,databaseName);
    +	}
    +
    +	/**
    +	 * Helper method to help testability
    +	 * @return RethinkDB instance
    +	 */ 
    +	protected RethinkDB getRethinkDB() {
    +		return RethinkDB.r;
    +	}
    +	
    +	/**
    +	 * Set username and password. If username and password are not provided,
    +	 * then default username (admin) and blank password are used.
    +	 * 
    +	 * @param username
    +	 * @param password
    +	 * 
    +	 * @throws IllegalArgumentException if arguments is null or empty
    +	 */
    +	public void setUsernameAndPassword(String username, String password) {
    +		
    +		if ( StringUtils.isBlank(username) )  {
    +			throw new IllegalArgumentException("username " + username + " cannot be null or empty" ); 
    +		} else {
    +			this.username = username;
    +		}
    +		
    +		if ( StringUtils.isBlank(password) ) {
    +			throw new IllegalArgumentException("password " + password + " cannot be null or empty" ); 
    +		} else {
    +			this.password = password;
    +		}
    +	}
    +	
    +	/**
    +	 * Invoke the sink with the input
    +	 * 
    +	 * @param the value to be inserted
    +	 * 
    +	 * @throws RuntimeException if there are errors while inserting row into rethinkdb
    +	 */
    +	@Override
    +	public void invoke(OUT value) throws Exception {
    +		LOG.debug("Received value {}", value);
    +		
    +		Object json = serializationSchema.toJSON(value);
    +		LOG.debug("Object/Json: {}/{}", value, json);
    +		Insert insert = getRdbTable().insert(json).optArg(CONFLICT_OPT, conflict.toString());
    +		HashMap<String,Object> result = runInsert(insert);
    +		
    +		LOG.debug("Object/Json/Result: {}/{}/{}", value, json, result);
    +		
    +		if ( (Long)result.get(RESULT_ERROR_KEY) != 0 ) {
    --- End diff --
    
    this is a synchronous operation, correct? If so I'd be curious about a benchmark for this sink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2031: FLINK-3967 - Flink Sink for Rethink Db

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2031
  
    Hi @mans2singh !
    There's good news regarding streaming connector contributions in Apache Flink: If you are interested in hosting the code as part of an Apache project, the Apache Bahir project has now a repository for hosing Flink connectors as well: https://github.com/apache/bahir-flink
    If you would like to contribute the connector there, feel free to open a new pull request there.
    
    Regards,
    Robert


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2031: FLINK-3967 - Flink Sink for Rethink Db

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2031
  
    Hi @mans2singh,
    I'm happy to hear that you are interested in contributing the code to Bahir.
    
    You can just fork the bahir-flink repo and create a folder similar to the existing connectors.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2031#discussion_r64527910
  
    --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/FlinkRethinkDbSink.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.rethinkdb;
    +
    +import java.io.Serializable;
    +import java.util.HashMap;
    +import java.util.Objects;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.rethinkdb.RethinkDB;
    +import com.rethinkdb.gen.ast.Insert;
    +import com.rethinkdb.gen.ast.Table;
    +import com.rethinkdb.net.Connection;
    +
    +/**
    + * This class is the Flink sink for RethinkDB which is a tcp/JSON protocol based document
    + * oriented NoSQL database.
    + * 
    + * <p/>
    + * This sink provides two constuctors:
    + * <p/>
    + * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, String table, JSONSerializationSchema schema)}, and
    + * <p/>
    + * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, String table, JSONSerializationSchema schema, ConflictStrategy conflictStrategy)}
    + * <p/>
    + * 
    + * The parameter for the constructor are as follows:
    + * <p/>
    + * <ul>
    + * <li>hostname - the rethinkdb hostname</li>
    + * <li>hosport - the rethinkdb port for the driver to connect</li>
    + * <li>database - the rethinkdb database name to which the table belongs</li>
    + * <li>table - the rethinkdb table name where documents are inserted</li>
    + * <li>schema - the schema tranfromer that converts input to JSONObject, or JSONArray</li>
    + * <li>conflictStrategy - the conflict resolution strategy in case inserted document has id which exists in the db</li>
    + * </ul>
    + * <p/>
    + *
    + * The user can also set:
    + * <p/>
    + * <ul>
    + * <li>username - default is admin</li>
    + * <li>password - default is blank</li>
    + * </ul>
    + * <p/> with the {@link #setUsernameAndPassword(String, String)} method.
    + * <p/>
    + * <b>NOTE: If multiple documents are getting inserted (eg: using JSONArray), the sink 
    + * checks if there is an error entry in the result HashMap and throws a runtime exception if errors
    + * counts is not zero.  The exception message contains the results HashMap. 
    + * In case of multiple errors only the first error is noted in the result HashMap.
    + * </b>
    + * 
    + * @see {@link ConflictStrategy} for conflict resolution strategies
    + * 
    + * @param <OUT> a value that can be transformed into a {@link org.json.simple.JSONArray;} or {@link org.json.simple.JSONObject}
    + */
    +public class FlinkRethinkDbSink<OUT> extends RichSinkFunction<OUT> implements Serializable{
    --- End diff --
    
    This class should be named RethinkDBSink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---