You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fpompermaier <gi...@git.apache.org> on 2016/04/14 14:26:39 UTC

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

GitHub user fpompermaier opened a pull request:

    https://github.com/apache/flink/pull/1885

    Improved JDBCInputFormat (FLINK-3750) and other fixes

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ X ] General
      - The pull request references the related JIRA issue
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message
    
    - [ X ] Documentation (**Except web site**)
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ X ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    -------------------------------------
    Improved JDBCInputFormat (FLINK-3750), fixed test Exceptions in the
    @AfterClass due to unclosed resources and removed unnecessary close()
    
    Still to discuss how to manage the several TODO added in the code

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fpompermaier/flink FLINK-3750

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1885.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1885
    
----
commit af2b74f707bf7cbabdbfe14b5c17be68c252aa22
Author: Flavio Pompermaier <f....@gmail.com>
Date:   2016-04-14T12:18:06Z

    Improved JDBCInputFormat (FLINK-3750), fixed test Exceptions in the
    @AfterClass due to unclosed resources and removed unnecessary close()

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59711282
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,13 +163,10 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    -				return true;
    -			}
    -			return false;
    +			return !resultSet.next();
    --- End diff --
    
    this will skip records if reachedEnd is used multiple times in a row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59722992
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -232,7 +255,7 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getInt(pos + 1), pos);
     						break;
     					case java.sql.Types.FLOAT:
    -						reuse.setField(resultSet.getDouble(pos + 1), pos);
    +						reuse.setField(resultSet.getFloat(pos + 1), pos);
    --- End diff --
    
    I'm aware of the confusion. However that is something users will have to live with; we can't just change the data so that a user doesn't run into an exception he _should_ run into.
    
    All we can do is document the mapping and hope that people will read it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712441
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -58,54 +52,39 @@ public static void setUpClass() {
     
     	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
     		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    +		Class.forName(JDBCExample.DRIVER_CLASS);
    +		conn = DriverManager.getConnection(JDBCExample.DB_URL+";create=true");
     		createTable();
     		insertDataToSQLTable();
     		conn.close();
     	}
     
     	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
     		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    +		stat.executeUpdate(JDBCExample.getCreateQuery());
    --- End diff --
    
    No problem for me. Maybe it could worth to create an util test class rather than moving all into the IF test class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59709889
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,19 +90,34 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
     		try {
    +			//TODO is this performed once per Task Manager..?
     			establishConnection();
     			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    +			String query = queryTemplate;
    +			if(isSplitConfigured()){
    +				RangeInputSplit jdbcInputSplit = (RangeInputSplit) inputSplit;
    +				long start = jdbcInputSplit.getMin();
    +				long end = jdbcInputSplit.getMax();
    +				if(isSplitConfigured()){
    +					query = queryTemplate.replace(CONDITIONS, String.format(BETWEEN, splitColumnName, start, end));
    +				}
    +			}
    +			LOG.debug(query);
     			resultSet = statement.executeQuery(query);
     		} catch (SQLException se) {
    -			close();
    +			//close(); already closed by the caller
     			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
     		} catch (ClassNotFoundException cnfe) {
     			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
     		}
     	}
     
    +	private boolean isSplitConfigured() {
    +		return splitColumnName!=null;
    --- End diff --
    
    missing spaces around !=


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712650
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,19 +90,34 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
     		try {
    +			//TODO is this performed once per Task Manager..?
     			establishConnection();
     			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    +			String query = queryTemplate;
    +			if(isSplitConfigured()){
    +				RangeInputSplit jdbcInputSplit = (RangeInputSplit) inputSplit;
    +				long start = jdbcInputSplit.getMin();
    +				long end = jdbcInputSplit.getMax();
    +				if(isSplitConfigured()){
    --- End diff --
    
    You're right :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59723867
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,13 +163,10 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    -				return true;
    -			}
    -			return false;
    +			return !resultSet.next();
    --- End diff --
    
    "Just call it once" isn't a particularly good argument; i could just as well say "don't try to read from an empty table".
    
    you could an IsConsumed flag to prevent records from being skipped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59711855
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -241,10 +264,31 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getDouble(pos + 1), pos);
     						break;
     					case java.sql.Types.DECIMAL:
    -						reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
    -						break;
    +						//TODO manage null fields
    +						try {
    +							if (o == null){
    +								reuse.setField("", pos);
    +							} else{
    +								reuse.setField(resultSet.getBigDecimal(pos + 1).toPlainString(), pos);
    +							}
    +						} catch (SQLException e) {
    +							System.err.println("error reading at position " + pos + " setting blank field!");
    --- End diff --
    
    this behavior is inconsistent with other types. either always fail or always set a default value.
    
    In any case, such an error should be logged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59729313
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java ---
    @@ -19,7 +19,6 @@
     
     import java.io.OutputStream;
     
    -@SuppressWarnings("unused")
    --- End diff --
    
    Now I'll use the class to generate the value of the_ derby.stream.error.field_ property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712089
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -363,14 +439,31 @@ public JDBCInputFormat finish() {
     			if (format.dbURL == null) {
     				throw new IllegalArgumentException("No database URL supplied");
     			}
    -			if (format.query == null) {
    +			if (format.queryTemplate == null) {
     				throw new IllegalArgumentException("No query supplied");
     			}
     			if (format.drivername == null) {
     				throw new IllegalArgumentException("No driver supplied");
     			}
    +			adjustQueryTemplateIfNecessary();
     			return format;
     		}
    +
    +		/** Try to add $CONDITIONS token automatically (at least for straightforward cases) */
    +		private void adjustQueryTemplateIfNecessary() {
    --- End diff --
    
    Indeed I only try to fix the query if it is a simple select (I add the WHERE clause). Otherwise it fails (I agree that is very dangerous to try to guess where to put the $CONDITIONS token).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59711573
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * A range input split provides information about a particular range of keys, each with a min and a max value. 
    + */
    +@Public
    +public class RangeInputSplit implements InputSplit{
    --- End diff --
    
    Aren't those checks already performed by the checkstyle plugin? I passed the mvn clean verify without errors..maybe it could worth to add these checks also!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59851873
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -119,7 +118,11 @@ private boolean isSplitConfigured() {
     		return splitColumnName != null;
     	}
     
    +	/** Perform initialization of the JDBC connection. Done just once per parallel task */
     	private void establishConnection() throws SQLException, ClassNotFoundException {
    +		if(dbConn!=null){
    --- End diff --
    
    It wa just to keep the connection establishing logic in a single method..no problem for me to move the check in the open method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier closed the pull request at:

    https://github.com/apache/flink/pull/1885


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59847704
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -119,7 +118,11 @@ private boolean isSplitConfigured() {
     		return splitColumnName != null;
     	}
     
    +	/** Perform initialization of the JDBC connection. Done just once per parallel task */
     	private void establishConnection() throws SQLException, ClassNotFoundException {
    +		if(dbConn!=null){
    --- End diff --
    
    it just seems odd that we call this method n times only to do nothing in n-1 cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59711015
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * A range input split provides information about a particular range of keys, each with a min and a max value. 
    + */
    +@Public
    --- End diff --
    
    as of right now connectors don't use annotations. In any case, a Public annotation doesn't really make sense since this class isn't even exposed to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59710635
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -58,54 +52,39 @@ public static void setUpClass() {
     
     	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
     		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    +		Class.forName(JDBCExample.DRIVER_CLASS);
    +		conn = DriverManager.getConnection(JDBCExample.DB_URL+";create=true");
     		createTable();
     		insertDataToSQLTable();
     		conn.close();
     	}
     
     	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
     		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    +		stat.executeUpdate(JDBCExample.getCreateQuery());
    --- End diff --
    
    can we do this the other way around, as in have the data in the test file but use it in the example?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712434
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -241,10 +264,31 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getDouble(pos + 1), pos);
     						break;
     					case java.sql.Types.DECIMAL:
    -						reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
    -						break;
    +						//TODO manage null fields
    +						try {
    +							if (o == null){
    +								reuse.setField("", pos);
    +							} else{
    +								reuse.setField(resultSet.getBigDecimal(pos + 1).toPlainString(), pos);
    --- End diff --
    
    we should probably do it properly this time and just return a BigDecimal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712531
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * A range input split provides information about a particular range of keys, each with a min and a max value. 
    + */
    +@Public
    --- End diff --
    
    Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712084
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -232,7 +255,7 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getInt(pos + 1), pos);
     						break;
     					case java.sql.Types.FLOAT:
    -						reuse.setField(resultSet.getDouble(pos + 1), pos);
    +						reuse.setField(resultSet.getFloat(pos + 1), pos);
    --- End diff --
    
    AFAIK SQL floats are more similar to doubles. i wouldn't change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59709757
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * A range input split provides information about a particular range of keys, each with a min and a max value. 
    + */
    +@Public
    +public class RangeInputSplit implements InputSplit{
    --- End diff --
    
    missing space before curly brace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712579
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -58,54 +52,39 @@ public static void setUpClass() {
     
     	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
     		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    +		Class.forName(JDBCExample.DRIVER_CLASS);
    +		conn = DriverManager.getConnection(JDBCExample.DB_URL+";create=true");
     		createTable();
     		insertDataToSQLTable();
     		conn.close();
     	}
     
     	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
     		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    +		stat.executeUpdate(JDBCExample.getCreateQuery());
     		stat.close();
     	}
     
    -	private static void insertDataToSQLTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
    -		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
    -		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
    -		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
    -		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
    -		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
     
    +	private static void insertDataToSQLTable() throws SQLException {
     		Statement stat = conn.createStatement();
    -		stat.execute(sqlQueryBuilder.toString());
    +		stat.execute(JDBCExample.getInsertQuery());
     		stat.close();
     	}
     
    +	
    --- End diff --
    
    unnecessary new lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/flink/pull/1885#issuecomment-212479891
  
    Depends on Flink-3777 (https://github.com/apache/flink/pull/1903) to properly manage the single connection instantiation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59709702
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,19 +90,34 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
     		try {
    +			//TODO is this performed once per Task Manager..?
     			establishConnection();
     			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    +			String query = queryTemplate;
    +			if(isSplitConfigured()){
    +				RangeInputSplit jdbcInputSplit = (RangeInputSplit) inputSplit;
    +				long start = jdbcInputSplit.getMin();
    +				long end = jdbcInputSplit.getMax();
    +				if(isSplitConfigured()){
    +					query = queryTemplate.replace(CONDITIONS, String.format(BETWEEN, splitColumnName, start, end));
    +				}
    +			}
    +			LOG.debug(query);
     			resultSet = statement.executeQuery(query);
     		} catch (SQLException se) {
    -			close();
    +			//close(); already closed by the caller
    --- End diff --
    
    this is not guaranteed, so please add them back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59722991
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -232,7 +255,7 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getInt(pos + 1), pos);
     						break;
     					case java.sql.Types.FLOAT:
    -						reuse.setField(resultSet.getDouble(pos + 1), pos);
    +						reuse.setField(resultSet.getFloat(pos + 1), pos);
    --- End diff --
    
    I'm aware of the confusion. However that is something users will have to live with; we can't just change the data so that a user doesn't run into an exception he _should_ run into.
    
    All we can do is document the mapping and hope that people will read it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59847510
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -119,7 +118,11 @@ private boolean isSplitConfigured() {
     		return splitColumnName != null;
     	}
     
    +	/** Perform initialization of the JDBC connection. Done just once per parallel task */
     	private void establishConnection() throws SQLException, ClassNotFoundException {
    +		if(dbConn!=null){
    --- End diff --
    
    i think we should make this check directly in open() around the establishConnection() call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59710345
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -363,14 +439,31 @@ public JDBCInputFormat finish() {
     			if (format.dbURL == null) {
     				throw new IllegalArgumentException("No database URL supplied");
     			}
    -			if (format.query == null) {
    +			if (format.queryTemplate == null) {
     				throw new IllegalArgumentException("No query supplied");
     			}
     			if (format.drivername == null) {
     				throw new IllegalArgumentException("No driver supplied");
     			}
    +			adjustQueryTemplateIfNecessary();
     			return format;
     		}
    +
    +		/** Try to add $CONDITIONS token automatically (at least for straightforward cases) */
    +		private void adjustQueryTemplateIfNecessary() {
    --- End diff --
    
    we should simply fail if a user does not supply a Condition but wants to use splits. This is murky territory imo,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712726
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,13 +163,10 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    -				return true;
    -			}
    -			return false;
    +			return !resultSet.next();
    --- End diff --
    
    does this really happen??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712803
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * A range input split provides information about a particular range of keys, each with a min and a max value. 
    + */
    +@Public
    +public class RangeInputSplit implements InputSplit{
    --- End diff --
    
    we decided against using checkstyle for these "minor" thing as they would break too many parts of the existing codebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59728464
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,13 +163,10 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    -				return true;
    -			}
    -			return false;
    +			return !resultSet.next();
    --- End diff --
    
    I found a fix for this. I'll push it asap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the pull request:

    https://github.com/apache/flink/pull/1885#issuecomment-214813684
  
    Sorry messed up...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59728967
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -232,7 +255,7 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getInt(pos + 1), pos);
     						break;
     					case java.sql.Types.FLOAT:
    -						reuse.setField(resultSet.getDouble(pos + 1), pos);
    +						reuse.setField(resultSet.getFloat(pos + 1), pos);
    --- End diff --
    
    Ok for me..however that should be definetely well documented


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59728344
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -58,54 +52,39 @@ public static void setUpClass() {
     
     	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
     		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    +		Class.forName(JDBCExample.DRIVER_CLASS);
    +		conn = DriverManager.getConnection(JDBCExample.DB_URL+";create=true");
     		createTable();
     		insertDataToSQLTable();
     		conn.close();
     	}
     
     	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
     		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    +		stat.executeUpdate(JDBCExample.getCreateQuery());
    --- End diff --
    
    I've created a base class extended by both input and output tests and used by the example. When finished with integrating your comments I'll push it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59725431
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java ---
    @@ -19,7 +19,6 @@
     
     import java.io.OutputStream;
     
    -@SuppressWarnings("unused")
    --- End diff --
    
    since this class is only referenced indirectly it should still require the warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59725309
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -58,54 +52,39 @@ public static void setUpClass() {
     
     	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
     		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    +		Class.forName(JDBCExample.DRIVER_CLASS);
    +		conn = DriverManager.getConnection(JDBCExample.DB_URL+";create=true");
     		createTable();
     		insertDataToSQLTable();
     		conn.close();
     	}
     
     	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
     		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    +		stat.executeUpdate(JDBCExample.getCreateQuery());
    --- End diff --
    
    maybe I'm overthinking this, but in my opinion a change to an example shouldn't have an effect on a test. that's why I'd rather keep them separated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59711539
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -241,10 +264,31 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getDouble(pos + 1), pos);
     						break;
     					case java.sql.Types.DECIMAL:
    -						reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
    -						break;
    +						//TODO manage null fields
    +						try {
    +							if (o == null){
    +								reuse.setField("", pos);
    +							} else{
    +								reuse.setField(resultSet.getBigDecimal(pos + 1).toPlainString(), pos);
    --- End diff --
    
    this seems inefficient. why not use resultSet.getBigDecimal right away and then check that fur == null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59709766
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * A range input split provides information about a particular range of keys, each with a min and a max value. 
    + */
    +@Public
    +public class RangeInputSplit implements InputSplit{
    +
    +	private static final long serialVersionUID = 4310893817447171721L;
    +	
    +	private long min;
    +	private long max;
    +	
    +	/** The number of the split. */
    +	private final int splitNumber;
    +	
    +	public RangeInputSplit(int splitNumber, long min, long max){
    --- End diff --
    
    missing space before curly brace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59715336
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,13 +163,10 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    -				return true;
    -			}
    -			return false;
    +			return !resultSet.next();
    --- End diff --
    
    The problem is that resultSet.isLast() return false when the resultset is empty and that is the safest solution. Just call it once..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712684
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -152,47 +133,49 @@ public void testInvalidQuery() throws IOException {
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompleteConfiguration() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setQuery("select * from books")
    +				.setDrivername(JDBCExample.DRIVER_CLASS)
    +				.setQuery(JDBCExample.SELECT_ALL_BOOKS)
     				.finish();
     	}
     
     	@Test(expected = IOException.class)
     	public void testIncompatibleTuple() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDrivername(JDBCExample.DRIVER_CLASS)
    +				.setDBUrl(JDBCExample.DB_URL)
    +				.setQuery(JDBCExample.SELECT_ALL_BOOKS)
     				.finish();
     		jdbcInputFormat.open(null);
     		jdbcInputFormat.nextRecord(new Tuple2());
     	}
     
     	@Test
    -	public void testJDBCInputFormat() throws IOException {
    +	public void testJDBCInputFormatWithoutParallelism() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDrivername(JDBCExample.DRIVER_CLASS)
    +				.setDBUrl(JDBCExample.DB_URL)
    +				.setQuery(JDBCExample.SELECT_ALL_BOOKS)
     				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
     				.finish();
     		jdbcInputFormat.open(null);
     		Tuple5 tuple = new Tuple5();
     		int recordCount = 0;
     		while (!jdbcInputFormat.reachedEnd()) {
     			jdbcInputFormat.nextRecord(tuple);
    -			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
    -			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
    -			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
    -			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
    -			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
    +			if(tuple.getField(0)!=null) Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
    --- End diff --
    
    missing braces and whitespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59711128
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,19 +90,34 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
     		try {
    +			//TODO is this performed once per Task Manager..?
     			establishConnection();
     			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    +			String query = queryTemplate;
    +			if(isSplitConfigured()){
    +				RangeInputSplit jdbcInputSplit = (RangeInputSplit) inputSplit;
    +				long start = jdbcInputSplit.getMin();
    +				long end = jdbcInputSplit.getMax();
    +				if(isSplitConfigured()){
    --- End diff --
    
    redundant check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59713178
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -232,7 +255,7 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getInt(pos + 1), pos);
     						break;
     					case java.sql.Types.FLOAT:
    -						reuse.setField(resultSet.getDouble(pos + 1), pos);
    +						reuse.setField(resultSet.getFloat(pos + 1), pos);
    --- End diff --
    
    The problem arises when I look at the column type in the db and I see Float, so I set Float as the Class of my tuple field and then I get a ClassCastException..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Improved JDBCInputFormat (FLINK-3750) and othe...

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1885#discussion_r59712911
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -241,10 +264,31 @@ private void addValue(OUT reuse) throws IOException, SQLException {
     						reuse.setField(resultSet.getDouble(pos + 1), pos);
     						break;
     					case java.sql.Types.DECIMAL:
    -						reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
    -						break;
    +						//TODO manage null fields
    +						try {
    +							if (o == null){
    +								reuse.setField("", pos);
    +							} else{
    +								reuse.setField(resultSet.getBigDecimal(pos + 1).toPlainString(), pos);
    --- End diff --
    
    Because Flink cannot handle null values (unless you're using strings). Indeed we need to decide how to overcome such limitation (for JDBC almost all tables have null values that is bad to convert to 0.0 as in the Double case...).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---