You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fpompermaier <gi...@git.apache.org> on 2016/04/26 19:16:00 UTC

[GitHub] flink pull request: Flink 3750 fixed

GitHub user fpompermaier opened a pull request:

    https://github.com/apache/flink/pull/1941

    Flink 3750 fixed

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [X] General
      - The pull request references the related JIRA issue
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    ---------
    New cleaned PR aligned with the current master

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fpompermaier/flink FLINK-3750-fixed

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1941.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1941
    
----
commit 206920c85f617728d67d5d02f4fe07da228e069b
Author: Flavio Pompermaier <f....@gmail.com>
Date:   2016-04-26T17:10:53Z

    FLINK-3750 new PR (the old one was messed up...)

commit df0d9bff8fe8f5d291ab415e3a77db45f4fdbc92
Author: Flavio Pompermaier <f....@gmail.com>
Date:   2016-04-26T17:13:09Z

    Removed commented method

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62022375
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +203,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if(!hasNext) {
    +				return null;
    +			}
    +			try{
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    +			}catch(NullPointerException npe) {
    --- End diff --
    
    Ok thanks! I saw that there's a discussion about a formatter for IntelliJ...it would be great to have such a formatter also for Eclipse :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019090
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,12 +184,13 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    +			if (!hasNext || resultSet.isLast()) {
    --- End diff --
    
    I think we should not use `resultSet.isLast()`. 
    Instead we can call `ResultSet.next()` initially in `open()` and at the end of `nextRecord()` and use the `hasNext` flag in `reachedEnd()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018617
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +203,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if(!hasNext) {
    +				return null;
    +			}
    +			try{
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    +			}catch(NullPointerException npe) {
    --- End diff --
    
    spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019163
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
     		}
     	}
     
    -	private enum SupportedTypes {
    -		BOOLEAN,
    -		BYTE,
    -		SHORT,
    -		INTEGER,
    -		LONG,
    -		STRING,
    -		FLOAT,
    -		DOUBLE
    -	}
    -
     	/**
     	 * Adds a record to the prepared statement.
     	 * <p>
     	 * When this method is called, the output format is guaranteed to be opened.
    +	 * 
    +	 * WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow
     	 *
     	 * @param tuple The records to add to the output.
     	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     	 */
     	@Override
    -	public void writeRecord(OUT tuple) throws IOException {
    +	public void writeRecord(Row tuple) throws IOException {
     		try {
    -			if (types == null) {
    -				extractTypes(tuple);
    +			for (int index = 0; index < tuple.productArity(); index++) {
    +				if(tuple.productElement(index) ==null && typesArray != null && typesArray.length > 0) {
    --- End diff --
    
    spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62847383
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
    +		hasNext = true;
     		try {
    -			establishConnection();
    -			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    -			resultSet = statement.executeQuery(query);
    +			if (inputSplit != null && parameterValues != null) {
    +				for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
    +					Object param = parameterValues[inputSplit.getSplitNumber()][i];
    +					if (param instanceof String) {
    +						statement.setString(i + 1, (String) param);
    +					} else if (param instanceof Long) {
    +						statement.setLong(i + 1, (Long) param);
    +					} else if (param instanceof Integer) {
    +						statement.setInt(i + 1, (Integer) param);
    +					} else if (param instanceof Double) {
    +						statement.setDouble(i + 1, (Double) param);
    +					} else if (param instanceof Boolean) {
    +						statement.setBoolean(i + 1, (Boolean) param);
    +					} else if (param instanceof Float) {
    +						statement.setFloat(i + 1, (Float) param);
    +					} else if (param instanceof BigDecimal) {
    +						statement.setBigDecimal(i + 1, (BigDecimal) param);
    +					} else if (param instanceof Byte) {
    +						statement.setByte(i + 1, (Byte) param);
    +					} else if (param instanceof Short) {
    +						statement.setShort(i + 1, (Short) param);
    +					} else if (param instanceof Date) {
    +						statement.setDate(i + 1, (Date) param);
    +					} else if (param instanceof Time) {
    +						statement.setTime(i + 1, (Time) param);
    +					} else if (param instanceof Timestamp) {
    +						statement.setTimestamp(i + 1, (Timestamp) param);
    +					} else if (param instanceof Array) {
    +						statement.setArray(i + 1, (Array) param);
    +					} else {
    +						//extends with other types if needed
    +						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
    +					}
    +				}
    +				if (LOG.isDebugEnabled()) {
    +					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
    +				}
    +			}
    +			resultSet = statement.executeQuery();
     		} catch (SQLException se) {
     			close();
    --- End diff --
    
    I did that in the previous version of this PR (https://github.com/apache/flink/pull/1885) but @zentol told me to leave it ("this is not guaranteed, so please add them back"). From my check @zentol was right..isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832588
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
    +		hasNext = true;
    --- End diff --
    
    Call `resultSet.next()` at the end of `open()` and only set `hasNext = true` if the split actually contains a record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62834635
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc.example;
    +
    +import java.sql.Types;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
    +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCTestBase;
    +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.junit.Test;
    +
    +public class JDBCFullTest extends JDBCTestBase {
    +	
    +	@Test
    +	public void test() throws Exception {
    --- End diff --
    
    This test does not assert the correctness of the data. It just checks that the code runs but not whether it is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833293
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc.split;
    +
    +/** 
    + * 
    + * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
    + * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
    + * ranging from the min value up to the max.
    + * 
    + * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
    + * <PRE>
    + *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
    + * </PRE>
    + *
    + * you can use this class to automatically generate the parameters of the BETWEEN clause,
    + * based on the passed constructor parameters.
    + * 
    + * */
    +public class NumericBetweenParametersProvider implements ParameterValuesProvider {
    +
    +	private static final long serialVersionUID = 1L;
    +	private long fetchSize;
    +	private final long min;
    +	private final long max;
    +	
    +	public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
    +		this.fetchSize = fetchSize;
    +		this.min = min;
    +		this.max = max;
    +	}
    +
    +	@Override
    +	public Object[][] getParameterValues(){
    +		double maxEelemCount = (max - min) + 1;
    +		int size = new Double(Math.ceil(maxEelemCount / fetchSize)).intValue();
    +		//decomment if we ever have to respect the minNumSplits constraint..
    +//		if (minNumberOfQueries > size) {
    --- End diff --
    
    Can this code be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63174853
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if (!hasNext) {
    +				return null;
    +			}
    +			try {
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    --- End diff --
    
    OK, I see. How about we extend the InputFormat to implement the `ResultTypeQueryable` interface and let users either specify all field types or at least the number of result attributes via the `JDBCInputFormatBuilder`.
    Then we do not have to fall back to the KryoSerializer that does creates corrupt `Row` objects and users do not have to specify types in the `env.createInput()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63161244
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---
    @@ -0,0 +1,182 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +
    +/**
    + * Base test class for JDBC Input and Output formats
    + */
    +public class JDBCTestBase {
    +	
    +	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
    +	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
    +	public static final String INPUT_TABLE = "books";
    +	public static final String OUTPUT_TABLE = "newbooks";
    +	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
    +	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
    +	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
    +	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
    +	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
    +	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
    +	
    +	protected JDBCInputFormat jdbcInputFormat;
    +	protected JDBCOutputFormat jdbcOutputFormat;
    +
    +	protected static Connection conn;
    +
    +	public static final Object[][] testData = {
    --- End diff --
    
    Oh, yes. Sorry :blush: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833691
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -19,180 +19,224 @@
     package org.apache.flink.api.java.io.jdbc;
     
     import java.io.IOException;
    -import java.sql.Connection;
    -import java.sql.DriverManager;
    -import java.sql.SQLException;
    -import java.sql.Statement;
     import java.sql.ResultSet;
     
    -
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
    +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
    +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
    +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.core.io.InputSplit;
     import org.junit.Assert;
    -
    -import org.apache.flink.api.java.tuple.Tuple2;
    -import org.apache.flink.api.java.tuple.Tuple5;
    -import org.junit.After;
    -import org.junit.AfterClass;
    -import org.junit.BeforeClass;
     import org.junit.Test;
     
    -public class JDBCInputFormatTest {
    -	JDBCInputFormat jdbcInputFormat;
    -
    -	static Connection conn;
    -
    -	static final Object[][] dbData = {
    -		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
    -		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
    -		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
    -		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
    -		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
    -
    -	@BeforeClass
    -	public static void setUpClass() {
    -		try {
    -			prepareDerbyDatabase();
    -		} catch (Exception e) {
    -			Assert.fail();
    -		}
    -	}
    -
    -	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
    -		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    -		createTable();
    -		insertDataToSQLTable();
    -		conn.close();
    -	}
    -
    -	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
    -		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	private static void insertDataToSQLTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
    -		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
    -		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
    -		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
    -		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
    -		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
    -
    -		Statement stat = conn.createStatement();
    -		stat.execute(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	@AfterClass
    -	public static void tearDownClass() {
    -		cleanUpDerbyDatabases();
    -	}
    -
    -	private static void cleanUpDerbyDatabases() {
    -		try {
    -			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -
    -			conn = DriverManager.getConnection(dbURL);
    -			Statement stat = conn.createStatement();
    -			stat.executeUpdate("DROP TABLE books");
    -			stat.close();
    -			conn.close();
    -		} catch (Exception e) {
    -			e.printStackTrace();
    -			Assert.fail();
    -		}
    -	}
    -
    -	@After
    -	public void tearDown() {
    -		jdbcInputFormat = null;
    -	}
    +public class JDBCInputFormatTest extends JDBCTestBase {
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidDriver() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
     				.setDrivername("org.apache.derby.jdbc.idontexist")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDBUrl(DB_URL)
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidURL() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +				.setDrivername(DRIVER_CLASS)
     				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
    -				.setQuery("select * from books")
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidQuery() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
     				.setQuery("iamnotsql")
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompleteConfiguration() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setQuery("select * from books")
    +				.setDrivername(DRIVER_CLASS)
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
     	}
     
    -	@Test(expected = IOException.class)
    -	public void testIncompatibleTuple() throws IOException {
    +	@Test
    +	public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(SELECT_ALL_BOOKS)
    +				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
     				.finish();
    +		jdbcInputFormat.openInputFormat();
     		jdbcInputFormat.open(null);
    -		jdbcInputFormat.nextRecord(new Tuple2());
    +		Row row =  new Row(5);
    +		int recordCount = 0;
    +		while (!jdbcInputFormat.reachedEnd()) {
    +			Row next = jdbcInputFormat.nextRecord(row);
    +			if (next == null) {
    +				break;
    +			}
    +			
    +			if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
    +			if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
    +			if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
    +			if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
    +			if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
    +
    +			for (int x = 0; x < 5; x++) {
    +				if(testData[recordCount][x]!=null) {
    +					Assert.assertEquals(testData[recordCount][x], next.productElement(x));
    +				}
    +			}
    +			recordCount++;
    +		}
    +		jdbcInputFormat.close();
    +		jdbcInputFormat.closeInputFormat();
    +		Assert.assertEquals(testData.length, recordCount);
    +	}
    +	
    +	@Test
    +	public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
    +		final int fetchSize = 1;
    +		final Long min = new Long(JDBCTestBase.testData[0][0] + "");
    +		final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
    +		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
    +		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
    +				.setParametersProvider(pramProvider)
    +				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
    +				.finish();
    +		jdbcInputFormat.openInputFormat();
    +		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
    --- End diff --
    
    check number of returned splits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019142
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -314,7 +262,8 @@ public static JDBCInputFormatBuilder buildJDBCInputFormat() {
     
     		public JDBCInputFormatBuilder() {
     			this.format = new JDBCInputFormat();
    -			this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
    +			// The 'isLast' method is only allowed on scroll cursors.
    +			this.format.resultSetType = ResultSet.TYPE_SCROLL_INSENSITIVE;
    --- End diff --
    
    We can use `TYPE_FORWARD_ONLY` if we remove `ResultSet.isLast()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-219961812
  
    Hi @fpompermaier, I added one more comment. I'm not sure if you noticed the other two comments I made a fews day back to `JDBCFullTest` and `JdbcInputFormat.nextRecord()` after your last update of the PR. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62834552
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---
    @@ -19,135 +19,42 @@
     package org.apache.flink.api.java.io.jdbc;
     
     import java.io.IOException;
    -import java.sql.Connection;
    -import java.sql.DriverManager;
    -import java.sql.SQLException;
    -import java.sql.Statement;
     import java.sql.ResultSet;
     
    -import org.junit.Assert;
    -
     import org.apache.flink.api.java.tuple.Tuple5;
    -import org.junit.After;
    -import org.junit.AfterClass;
    -import org.junit.BeforeClass;
    +import org.apache.flink.api.table.Row;
    +import org.junit.Assert;
     import org.junit.Test;
     
    -public class JDBCOutputFormatTest {
    -	private JDBCInputFormat jdbcInputFormat;
    -	private JDBCOutputFormat jdbcOutputFormat;
    -
    -	private static Connection conn;
    -
    -	static final Object[][] dbData = {
    -		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
    -		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
    -		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
    -		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
    -		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
    -
    -	@BeforeClass
    -	public static void setUpClass() throws SQLException {
    -		try {
    -			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -			prepareDerbyDatabase();
    -		} catch (ClassNotFoundException e) {
    -			e.printStackTrace();
    -			Assert.fail();
    -		}
    -	}
    -
    -	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    -		createTable("books");
    -		createTable("newbooks");
    -		insertDataToSQLTables();
    -		conn.close();
    -	}
    -
    -	private static void createTable(String tableName) throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
    -		sqlQueryBuilder.append(tableName);
    -		sqlQueryBuilder.append(" (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
    -		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	private static void insertDataToSQLTables() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
    -		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
    -		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
    -		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
    -		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
    -		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
    -
    -		Statement stat = conn.createStatement();
    -		stat.execute(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	@AfterClass
    -	public static void tearDownClass() {
    -		cleanUpDerbyDatabases();
    -	}
    -
    -	private static void cleanUpDerbyDatabases() {
    -		try {
    -			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -
    -			conn = DriverManager.getConnection(dbURL);
    -			Statement stat = conn.createStatement();
    -			stat.executeUpdate("DROP TABLE books");
    -			stat.executeUpdate("DROP TABLE newbooks");
    -			stat.close();
    -			conn.close();
    -		} catch (Exception e) {
    -			e.printStackTrace();
    -			Assert.fail();
    -		}
    -	}
    -
    -	@After
    -	public void tearDown() {
    -		jdbcOutputFormat = null;
    -	}
    +public class JDBCOutputFormatTest extends JDBCTestBase {
    --- End diff --
    
    Add a test to check the insertion of null values for different types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832218
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
    --- End diff --
    
    JavaDocs need to be updated for new parameter name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r61735690
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -352,6 +294,13 @@ public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency)
     			format.resultSetConcurrency = resultSetConcurrency;
     			return this;
     		}
    +		public JDBCInputFormatBuilder setSplitConfig(String splitColumnName,long fetchSize, long min, long max) {
    --- End diff --
    
    - new line 
    - space after `String splitColumnName,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832699
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -113,19 +192,7 @@ public void close() throws IOException {
     		try {
     			resultSet.close();
     		} catch (SQLException se) {
    -			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
    -		} catch (NullPointerException npe) {
    -		}
    -		try {
    -			statement.close();
    --- End diff --
    
    null check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832299
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -19,59 +19,112 @@
     package org.apache.flink.api.java.io.jdbc;
     
     import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.sql.Array;
     import java.sql.Connection;
    +import java.sql.Date;
     import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
     import java.sql.ResultSet;
     import java.sql.ResultSetMetaData;
     import java.sql.SQLException;
    -import java.sql.Statement;
    -
    -import org.apache.flink.api.common.io.NonParallelInput;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.Arrays;
     
     import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
     import org.apache.flink.api.common.io.RichInputFormat;
     import org.apache.flink.api.common.io.statistics.BaseStatistics;
    -import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
    +import org.apache.flink.api.table.Row;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.core.io.GenericInputSplit;
     import org.apache.flink.core.io.InputSplit;
     import org.apache.flink.core.io.InputSplitAssigner;
    -import org.apache.flink.types.NullValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     /**
    - * InputFormat to read data from a database and generate tuples.
    + * InputFormat to read data from a database and generate Rows.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
      * 
    - * @param <OUT>
    - * @see Tuple
    + * In order to query the JDBC source in parallel, you need to provide a parameterized
    + * query template (i.e. a valid {@link PreparedStatement}) and a {@link ParameterValuesProvider} 
    + * which provides binding values for the query parameters.
    + * 
    + * @see Row
    + * @see ParameterValuesProvider
    + * @see PreparedStatement
      * @see DriverManager
      */
    -public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    -	private static final long serialVersionUID = 1L;
    +public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> {
     
    +	private static final long serialVersionUID = 1L;
     	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
     
     	private String username;
     	private String password;
     	private String drivername;
     	private String dbURL;
    -	private String query;
    +	private String queryTemplate;
     	private int resultSetType;
     	private int resultSetConcurrency;
     
     	private transient Connection dbConn;
    -	private transient Statement statement;
    +	private transient PreparedStatement statement;
     	private transient ResultSet resultSet;
    -
    -	private int[] columnTypes = null;
    -
    +	
    +	private boolean hasNext = true;
    +	private Object[][] parameterValues;
    +	
     	public JDBCInputFormat() {
     	}
     
     	@Override
     	public void configure(Configuration parameters) {
    +		//do nothing here
    +	}
    +	
    +	@Override
    +	public void openInputFormat() {
    +		//called once per inputFormat (on open)
    +		try {
    +			Class.forName(drivername);
    +			if (username == null) {
    +				dbConn = DriverManager.getConnection(dbURL);
    +			} else {
    +				dbConn = DriverManager.getConnection(dbURL, username, password);
    +			}
    +			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
    +		} catch (SQLException se) {
    +			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
    +		} catch (ClassNotFoundException cnfe) {
    +			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
    +		}
    +	}
    +	
    +	@Override
    +	public void closeInputFormat() {
    +		//called once per inputFormat (on close)
    +		try {
    +			statement.close();
    +		} catch (SQLException se) {
    +			LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
    +		} catch (NullPointerException npe) {
    +		} finally {
    +			statement = null;
    +		}
    +		
    +		try {
    +			dbConn.close();
    --- End diff --
    
    Check for `dbConn == null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018256
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,25 +127,36 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
    +		hasNext = true;
     		try {
    -			establishConnection();
    -			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    -			resultSet = statement.executeQuery(query);
    +			if(inputSplit!=null && inputSplit instanceof QueryParamInputSplit) {
    --- End diff --
    
    spaces: `inputSplit != null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62017855
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/QueryParamInputSplit.java ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.core.io.InputSplit;
    +
    +/**
    + * Query parameters container 
    + */
    +public class QueryParamInputSplit implements InputSplit {
    --- End diff --
    
    I don't think we need a special InputSplit.
    If we store the binding values in a Object[][] member variable of the JDBCInputFormat, we only need an index to look up the respective values. So we can use `GenericInputSplit` which has a split number that can be used as index.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832846
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if (!hasNext) {
    +				return null;
    +			}
    +			try {
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    --- End diff --
    
    Why not check `row == null` instead of putting logic into the `catch` branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018300
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,25 +127,36 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
    +		hasNext = true;
     		try {
    -			establishConnection();
    -			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    -			resultSet = statement.executeQuery(query);
    +			if(inputSplit!=null && inputSplit instanceof QueryParamInputSplit) {
    +				QueryParamInputSplit jdbcInputSplit = (QueryParamInputSplit) inputSplit;
    +				for (int i = 0; i < jdbcInputSplit.getQueryParameters().length; i++) {
    +					Object param = jdbcInputSplit.getQueryParameters()[i];
    +					if(param instanceof String) {
    +						statement.setString(i + 1, (String) param);
    +					} else if(param instanceof Long) {
    +						statement.setLong(i + 1, (Long) param);
    +					} else if(param instanceof Integer) {
    +						statement.setInt(i + 1, (Integer) param);
    +					} else if(param instanceof Double) {
    +						statement.setDouble(i + 1, (Double) param);
    +					} else if(param instanceof Boolean) {
    +						statement.setBoolean(i + 1, (Boolean) param);
    +					} else {
    --- End diff --
    
    Add common time types (data, time, timestamp) and the remaining primitives: byte, short, float, ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62852662
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if (!hasNext) {
    +				return null;
    +			}
    +			try {
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    --- End diff --
    
    because it can happen that the row !=null but inside it is uninitialized, for example when the  (see for example **JDBCInputFormatTest.testUninitializedRow**.
    
    That happens when you do the following (for example):
    ``` java
    DataSet<Row> source = environment.createInput(inputBuilder.finish());
    ```
    
     The only method I found to detect such a situation is to catch the nullPointerException generated when trying to access to one of its methods, e.g. 
    ```java
    row.productArity()
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018058
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/JDBCInputSplitsGenerator.java ---
    @@ -15,17 +15,14 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.flink.api.java.io.jdbc;
    +package org.apache.flink.api.java.io.jdbc.split;
     
    -import java.io.OutputStream;
    +import java.io.Serializable;
     
    -@SuppressWarnings("unused")
    -/**
    - * Utility class to disable derby logging
    - */
    -public class DerbyUtil {
    -	public static final OutputStream DEV_NULL = new OutputStream() {
    -		public void write(int b) {
    -		}
    -	};
    +import org.apache.flink.api.java.io.QueryParamInputSplit;
    +
    +public interface JDBCInputSplitsGenerator extends Serializable {
    --- End diff --
    
    I think we should hide the concept of input splits from users. Can you rename the interface to `ParameterValueProvider`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63161970
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -94,31 +94,106 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
     			dbConn = DriverManager.getConnection(dbURL, username, password);
     		}
     	}
    -
    +	
     	/**
     	 * Adds a record to the prepared statement.
     	 * <p>
     	 * When this method is called, the output format is guaranteed to be opened.
    +	 * </p>
     	 * 
    -	 * WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow
    +	 * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
    --- End diff --
    
    This warning should also go into the JavaDocs of the `JdbcOutputFormat` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832479
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
     	 * @throws IOException
     	 */
     	@Override
    -	public void open(InputSplit ignored) throws IOException {
    +	public void open(InputSplit inputSplit) throws IOException {
    +		hasNext = true;
     		try {
    -			establishConnection();
    -			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
    -			resultSet = statement.executeQuery(query);
    +			if (inputSplit != null && parameterValues != null) {
    +				for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
    +					Object param = parameterValues[inputSplit.getSplitNumber()][i];
    +					if (param instanceof String) {
    +						statement.setString(i + 1, (String) param);
    +					} else if (param instanceof Long) {
    +						statement.setLong(i + 1, (Long) param);
    +					} else if (param instanceof Integer) {
    +						statement.setInt(i + 1, (Integer) param);
    +					} else if (param instanceof Double) {
    +						statement.setDouble(i + 1, (Double) param);
    +					} else if (param instanceof Boolean) {
    +						statement.setBoolean(i + 1, (Boolean) param);
    +					} else if (param instanceof Float) {
    +						statement.setFloat(i + 1, (Float) param);
    +					} else if (param instanceof BigDecimal) {
    +						statement.setBigDecimal(i + 1, (BigDecimal) param);
    +					} else if (param instanceof Byte) {
    +						statement.setByte(i + 1, (Byte) param);
    +					} else if (param instanceof Short) {
    +						statement.setShort(i + 1, (Short) param);
    +					} else if (param instanceof Date) {
    +						statement.setDate(i + 1, (Date) param);
    +					} else if (param instanceof Time) {
    +						statement.setTime(i + 1, (Time) param);
    +					} else if (param instanceof Timestamp) {
    +						statement.setTimestamp(i + 1, (Timestamp) param);
    +					} else if (param instanceof Array) {
    +						statement.setArray(i + 1, (Array) param);
    +					} else {
    +						//extends with other types if needed
    +						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
    +					}
    +				}
    +				if (LOG.isDebugEnabled()) {
    +					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
    +				}
    +			}
    +			resultSet = statement.executeQuery();
     		} catch (SQLException se) {
     			close();
    --- End diff --
    
    No need to call `close()`. This should be done by the object that manages the IF's life cycle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63702313
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -48,17 +49,53 @@
     /**
      * InputFormat to read data from a database and generate Rows.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
    - * 
    - * In order to query the JDBC source in parallel, you need to provide a parameterized
    - * query template (i.e. a valid {@link PreparedStatement}) and a {@link ParameterValuesProvider} 
    - * which provides binding values for the query parameters.
    - * 
    + * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
    --- End diff --
    
    Thanks for the nice JavaDocs! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63162186
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -94,31 +94,106 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
     			dbConn = DriverManager.getConnection(dbURL, username, password);
     		}
     	}
    -
    +	
     	/**
     	 * Adds a record to the prepared statement.
     	 * <p>
     	 * When this method is called, the output format is guaranteed to be opened.
    +	 * </p>
     	 * 
    -	 * WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow
    +	 * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
    +	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
     	 *
    -	 * @param tuple The records to add to the output.
    +	 * @param row The records to add to the output.
    +	 * @see PreparedStatement
     	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     	 */
     	@Override
    -	public void writeRecord(Row tuple) throws IOException {
    +	public void writeRecord(Row row) throws IOException {
    +		if (typesArray != null && typesArray.length > 0 && typesArray.length == row.productArity()) {
    +			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
    +		} 
     		try {
    -			for (int index = 0; index < tuple.productArity(); index++) {
    -				if (tuple.productElement(index) == null && typesArray != null && typesArray.length > 0) {
    -					if (typesArray.length == tuple.productArity()) {
    -						upload.setNull(index + 1, typesArray[index]);
    -					} else {
    -						LOG.warn("Column SQL types array doesn't match arity of SqlRow! Check the passed array...");
    -					}
    +			for (int index = 0; index < row.productArity(); index++) {
    +				if (typesArray == null ) {
    --- End diff --
    
    I would move this check out of the loop, i.e.,
    ```
    if (typesArray == null) {
      for (...) {
        //...
       upload.setObject(...)
      }
    } else {
      for (...) {
        // ...
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-216825461
  
    Thanks for the update @fpompermaier. I added a few comments and suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62854701
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc.example;
    +
    +import java.sql.Types;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
    +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCTestBase;
    +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.junit.Test;
    +
    +public class JDBCFullTest extends JDBCTestBase {
    +	
    +	@Test
    +	public void test() throws Exception {
    --- End diff --
    
    Before of my PR this class was not even a test, the code was inside a main(), I thought it was at least better than that but I didn't want to spend more time on this..is it a problem if we leave it as it is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62850239
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---
    @@ -0,0 +1,182 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +
    +/**
    + * Base test class for JDBC Input and Output formats
    + */
    +public class JDBCTestBase {
    +	
    +	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
    +	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
    +	public static final String INPUT_TABLE = "books";
    +	public static final String OUTPUT_TABLE = "newbooks";
    +	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
    +	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
    +	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
    +	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
    +	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
    +	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
    +	
    +	protected JDBCInputFormat jdbcInputFormat;
    +	protected JDBCOutputFormat jdbcOutputFormat;
    +
    +	protected static Connection conn;
    +
    +	public static final Object[][] testData = {
    --- End diff --
    
    The record with id 1010 contains already a null field..isn't that sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833327
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc.split;
    +
    +/** 
    + * 
    + * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
    + * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
    + * ranging from the min value up to the max.
    + * 
    + * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
    + * <PRE>
    + *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
    + * </PRE>
    + *
    + * you can use this class to automatically generate the parameters of the BETWEEN clause,
    + * based on the passed constructor parameters.
    + * 
    + * */
    +public class NumericBetweenParametersProvider implements ParameterValuesProvider {
    +
    +	private static final long serialVersionUID = 1L;
    +	private long fetchSize;
    +	private final long min;
    +	private final long max;
    +	
    +	public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
    +		this.fetchSize = fetchSize;
    +		this.min = min;
    +		this.max = max;
    +	}
    +
    +	@Override
    +	public Object[][] getParameterValues(){
    +		double maxEelemCount = (max - min) + 1;
    --- End diff --
    
    typo in var name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833637
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -19,180 +19,224 @@
     package org.apache.flink.api.java.io.jdbc;
     
     import java.io.IOException;
    -import java.sql.Connection;
    -import java.sql.DriverManager;
    -import java.sql.SQLException;
    -import java.sql.Statement;
     import java.sql.ResultSet;
     
    -
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
    +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
    +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
    +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.core.io.InputSplit;
     import org.junit.Assert;
    -
    -import org.apache.flink.api.java.tuple.Tuple2;
    -import org.apache.flink.api.java.tuple.Tuple5;
    -import org.junit.After;
    -import org.junit.AfterClass;
    -import org.junit.BeforeClass;
     import org.junit.Test;
     
    -public class JDBCInputFormatTest {
    -	JDBCInputFormat jdbcInputFormat;
    -
    -	static Connection conn;
    -
    -	static final Object[][] dbData = {
    -		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
    -		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
    -		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
    -		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
    -		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
    -
    -	@BeforeClass
    -	public static void setUpClass() {
    -		try {
    -			prepareDerbyDatabase();
    -		} catch (Exception e) {
    -			Assert.fail();
    -		}
    -	}
    -
    -	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
    -		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    -		createTable();
    -		insertDataToSQLTable();
    -		conn.close();
    -	}
    -
    -	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
    -		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	private static void insertDataToSQLTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
    -		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
    -		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
    -		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
    -		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
    -		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
    -
    -		Statement stat = conn.createStatement();
    -		stat.execute(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	@AfterClass
    -	public static void tearDownClass() {
    -		cleanUpDerbyDatabases();
    -	}
    -
    -	private static void cleanUpDerbyDatabases() {
    -		try {
    -			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -
    -			conn = DriverManager.getConnection(dbURL);
    -			Statement stat = conn.createStatement();
    -			stat.executeUpdate("DROP TABLE books");
    -			stat.close();
    -			conn.close();
    -		} catch (Exception e) {
    -			e.printStackTrace();
    -			Assert.fail();
    -		}
    -	}
    -
    -	@After
    -	public void tearDown() {
    -		jdbcInputFormat = null;
    -	}
    +public class JDBCInputFormatTest extends JDBCTestBase {
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidDriver() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
     				.setDrivername("org.apache.derby.jdbc.idontexist")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDBUrl(DB_URL)
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidURL() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +				.setDrivername(DRIVER_CLASS)
     				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
    -				.setQuery("select * from books")
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidQuery() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
     				.setQuery("iamnotsql")
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompleteConfiguration() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setQuery("select * from books")
    +				.setDrivername(DRIVER_CLASS)
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
     	}
     
    -	@Test(expected = IOException.class)
    -	public void testIncompatibleTuple() throws IOException {
    +	@Test
    +	public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(SELECT_ALL_BOOKS)
    +				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
     				.finish();
    +		jdbcInputFormat.openInputFormat();
    --- End diff --
    
    check that `jdbcInputFormat.getInputSplits(1).length == 1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832955
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if (!hasNext) {
    +				return null;
    +			}
    +			try {
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    +			} catch(NullPointerException npe) {
    +				ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
    +				row = new Row(resultSetMetaData.getColumnCount());
    +				LOG.warn("TypeInfo not provided to the InputFormat. Row cannot be reused.");
     			}
    -			addValue(tuple);
    -			return tuple;
    +			for (int pos = 0; pos < row.productArity(); pos++) {
    +				row.setField(pos, resultSet.getObject(pos + 1));
    +			}
    +			return row;
     		} catch (SQLException se) {
     			close();
    --- End diff --
    
    Let the data source task call `close()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019420
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericColumnSplitsGenerator.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc.split;
    +
    +import org.apache.flink.api.java.io.QueryParamInputSplit;
    +
    +/** 
    + * 
    + * This splits generator assumes that the query has a BETWEEN constraint on a numeric column.
    + * The generated input splits will be of size equal to the configured fetchSize (maximum),
    + * ranging from the min value up to the max 
    + * 
    + * */
    +public class NumericColumnSplitsGenerator implements JDBCInputSplitsGenerator {
    --- End diff --
    
    I think ParameterValueProviders are very query specific. Not sure if it makes sense to provide default implementations. This class should be moved to the test scope, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019506
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc.example;
    +
    +import java.sql.Types;
    +import java.util.Arrays;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
    +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCTestBase;
    +import org.apache.flink.api.java.io.jdbc.split.NumericColumnSplitsGenerator;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.junit.Test;
    +
    +public class JDBCFullTest extends JDBCTestBase {
    +	
    +	@Test
    +	public void testWithParallelism() throws Exception {
    +		//run without parallelism
    +		runTest(false);
    +
    +		//cleanup
    +		JDBCTestBase.tearDownClass();
    +		JDBCTestBase.prepareTestDb();
    +		
    +		//run expliting parallelism
    +		runTest(true);
    +		
    +	}
    +	
    +	private void runTest(boolean exploitParallelism) throws Exception {
    +		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    +		JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
    +				.setDrivername(JDBCTestBase.DRIVER_CLASS)
    +				.setDBUrl(JDBCTestBase.DB_URL)
    +				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS);
    +		
    +		if(exploitParallelism) {
    +			final int fetchSize = 1;
    +			final Long min = new Long(JDBCTestBase.testData[0][0] + "");
    +			final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length-fetchSize][0] + "");
    +			//use a "splittable" query to exploit parallelism
    +			inputBuilder = inputBuilder
    +					.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
    +					.setSplitsGenerator(new NumericColumnSplitsGenerator(fetchSize, min, max));
    +		}
    +		TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
    +			BasicTypeInfo.INT_TYPE_INFO,
    +			BasicTypeInfo.STRING_TYPE_INFO,
    +			BasicTypeInfo.STRING_TYPE_INFO,
    +			BasicTypeInfo.DOUBLE_TYPE_INFO,
    +			BasicTypeInfo.INT_TYPE_INFO
    +		};
    +		TypeInformation<Row> rowTypeInfo = new RowTypeInfo(scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(fieldTypes)).seq());
    --- End diff --
    
    If you rebase the PR to the current master, `RowTypeInfo` was extended by a constructor that accepts plain Java arrays.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63702355
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -48,17 +49,53 @@
     /**
      * InputFormat to read data from a database and generate Rows.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
    - * 
    - * In order to query the JDBC source in parallel, you need to provide a parameterized
    - * query template (i.e. a valid {@link PreparedStatement}) and a {@link ParameterValuesProvider} 
    - * which provides binding values for the query parameters.
    - * 
    + * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
    + *
    + * <pre><code>
    + * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
    + *		BasicTypeInfo.INT_TYPE_INFO,
    + *		BasicTypeInfo.STRING_TYPE_INFO,
    + *		BasicTypeInfo.STRING_TYPE_INFO,
    + *		BasicTypeInfo.DOUBLE_TYPE_INFO,
    + *		BasicTypeInfo.INT_TYPE_INFO
    + *	};
    + *
    + * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
    + *
    + * Serializable[][] queryParameters = new String[2][1];
    --- End diff --
    
    This should go into the next code block about parallel reading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833230
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
     		}
     	}
     
    -	private enum SupportedTypes {
    -		BOOLEAN,
    -		BYTE,
    -		SHORT,
    -		INTEGER,
    -		LONG,
    -		STRING,
    -		FLOAT,
    -		DOUBLE
    -	}
    -
     	/**
     	 * Adds a record to the prepared statement.
     	 * <p>
     	 * When this method is called, the output format is guaranteed to be opened.
    +	 * 
    +	 * WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow
     	 *
     	 * @param tuple The records to add to the output.
     	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     	 */
     	@Override
    -	public void writeRecord(OUT tuple) throws IOException {
    +	public void writeRecord(Row tuple) throws IOException {
     		try {
    -			if (types == null) {
    -				extractTypes(tuple);
    +			for (int index = 0; index < tuple.productArity(); index++) {
    +				if (tuple.productElement(index) == null && typesArray != null && typesArray.length > 0) {
    +					if (typesArray.length == tuple.productArity()) {
    +						upload.setNull(index + 1, typesArray[index]);
    +					} else {
    +						LOG.warn("Column SQL types array doesn't match arity of SqlRow! Check the passed array...");
    +					}
    +				} else {
    +					//try generic set if no column type available
    +					//WARNING: this may fail if the JDBC driver doesn't handle null correctly
    +					upload.setObject(index + 1, tuple.productElement(index));
    --- End diff --
    
    Are there any drawbacks of using the generic `setObject()` method compared to the typed methods? If yes, we should use them if the types are specified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r61736272
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -26,52 +26,69 @@
     import java.sql.SQLException;
     import java.sql.Statement;
     
    -import org.apache.flink.api.common.io.NonParallelInput;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
     import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
     import org.apache.flink.api.common.io.RichInputFormat;
     import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.java.io.RangeInputSplit;
    +import org.apache.flink.api.java.io.GenericRow;
     import org.apache.flink.api.java.tuple.Tuple;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.core.io.GenericInputSplit;
     import org.apache.flink.core.io.InputSplit;
     import org.apache.flink.core.io.InputSplitAssigner;
    -import org.apache.flink.types.NullValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     /**
      * InputFormat to read data from a database and generate tuples.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
      * 
    + * Remark: split (if set) works only if the split column is numeric
    + * 
      * @param <OUT>
      * @see Tuple
      * @see DriverManager
      */
    -public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    -	private static final long serialVersionUID = 1L;
    +public class JDBCInputFormat extends RichInputFormat<GenericRow, InputSplit> {
     
    +	private static final long serialVersionUID = 1L;
     	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
     
     	private String username;
     	private String password;
     	private String drivername;
     	private String dbURL;
    -	private String query;
    +	private String queryTemplate;
     	private int resultSetType;
     	private int resultSetConcurrency;
     
     	private transient Connection dbConn;
     	private transient Statement statement;
     	private transient ResultSet resultSet;
    -
    -	private int[] columnTypes = null;
    +	
    +	private String splitColumnName;
    +	private long max;
    +	private long min;
    +	private long fetchSize;
    +	
    +	private boolean hasNext = true;
    +	
    +	private static final String BETWEEN = "(%s BETWEEN %s AND %s)";
    +	public static final String CONDITIONS = "$CONDITIONS";
     
     	public JDBCInputFormat() {
     	}
     
     	@Override
     	public void configure(Configuration parameters) {
    +		//called once per inputFormat (on open)
    +		try {
    +			establishConnection();
    --- End diff --
    
    I think I forgotto include the new connection management when I closed the previous branch..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832683
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -113,19 +192,7 @@ public void close() throws IOException {
     		try {
     			resultSet.close();
    --- End diff --
    
    check `resultSet == null` instead of catching NPE?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833149
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
     		}
     	}
     
    -	private enum SupportedTypes {
    -		BOOLEAN,
    -		BYTE,
    -		SHORT,
    -		INTEGER,
    -		LONG,
    -		STRING,
    -		FLOAT,
    -		DOUBLE
    -	}
    -
     	/**
     	 * Adds a record to the prepared statement.
     	 * <p>
     	 * When this method is called, the output format is guaranteed to be opened.
    +	 * 
    +	 * WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow
     	 *
     	 * @param tuple The records to add to the output.
     	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     	 */
     	@Override
    -	public void writeRecord(OUT tuple) throws IOException {
    +	public void writeRecord(Row tuple) throws IOException {
     		try {
    -			if (types == null) {
    -				extractTypes(tuple);
    +			for (int index = 0; index < tuple.productArity(); index++) {
    +				if (tuple.productElement(index) == null && typesArray != null && typesArray.length > 0) {
    +					if (typesArray.length == tuple.productArity()) {
    --- End diff --
    
    Move this check out of the loop. We do not need to check this for every attribute. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833018
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -314,6 +279,7 @@ public static JDBCInputFormatBuilder buildJDBCInputFormat() {
     
     		public JDBCInputFormatBuilder() {
     			this.format = new JDBCInputFormat();
    +			//use the "Firehose cursor" (see http://jtds.sourceforge.net/resultSets.html)
    --- End diff --
    
    Can you explain what this comment is about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018432
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -139,12 +184,13 @@ public void close() throws IOException {
     	@Override
     	public boolean reachedEnd() throws IOException {
     		try {
    -			if (resultSet.isLast()) {
    -				close();
    +			if (!hasNext || resultSet.isLast()) {
    +				close();	
    --- End diff --
    
    I think `reachedEnd()` should not call `close()`. `close()` should be called by the `DataSourceTask` (or the other class handling the `InputFormat`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63173122
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc.example;
    +
    +import java.sql.Types;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
    +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
    +import org.apache.flink.api.java.io.jdbc.JDBCTestBase;
    +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.junit.Test;
    +
    +public class JDBCFullTest extends JDBCTestBase {
    +	
    +	@Test
    +	public void test() throws Exception {
    --- End diff --
    
    Yes, previously this was an example and not a test. Not sure why it ended up in the `test` directory. 
    It is a good idea to have an end-to-end test, but we should also check that the job does what it should do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018585
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +203,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if(!hasNext) {
    +				return null;
    +			}
    +			try{
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    --- End diff --
    
    Why not check for `row != null && row.productArity() == resultSetMetaData.getColumnCount()`. The `ResultSetMetaData` should be fetched in `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833701
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---
    @@ -19,180 +19,224 @@
     package org.apache.flink.api.java.io.jdbc;
     
     import java.io.IOException;
    -import java.sql.Connection;
    -import java.sql.DriverManager;
    -import java.sql.SQLException;
    -import java.sql.Statement;
     import java.sql.ResultSet;
     
    -
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
    +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
    +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
    +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.core.io.InputSplit;
     import org.junit.Assert;
    -
    -import org.apache.flink.api.java.tuple.Tuple2;
    -import org.apache.flink.api.java.tuple.Tuple5;
    -import org.junit.After;
    -import org.junit.AfterClass;
    -import org.junit.BeforeClass;
     import org.junit.Test;
     
    -public class JDBCInputFormatTest {
    -	JDBCInputFormat jdbcInputFormat;
    -
    -	static Connection conn;
    -
    -	static final Object[][] dbData = {
    -		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
    -		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
    -		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
    -		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
    -		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
    -
    -	@BeforeClass
    -	public static void setUpClass() {
    -		try {
    -			prepareDerbyDatabase();
    -		} catch (Exception e) {
    -			Assert.fail();
    -		}
    -	}
    -
    -	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
    -		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
    -		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -		conn = DriverManager.getConnection(dbURL);
    -		createTable();
    -		insertDataToSQLTable();
    -		conn.close();
    -	}
    -
    -	private static void createTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
    -		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
    -		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
    -		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
    -		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
    -		sqlQueryBuilder.append("PRIMARY KEY (id))");
    -
    -		Statement stat = conn.createStatement();
    -		stat.executeUpdate(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	private static void insertDataToSQLTable() throws SQLException {
    -		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
    -		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
    -		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
    -		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
    -		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
    -		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
    -
    -		Statement stat = conn.createStatement();
    -		stat.execute(sqlQueryBuilder.toString());
    -		stat.close();
    -	}
    -
    -	@AfterClass
    -	public static void tearDownClass() {
    -		cleanUpDerbyDatabases();
    -	}
    -
    -	private static void cleanUpDerbyDatabases() {
    -		try {
    -			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
    -			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
    -
    -			conn = DriverManager.getConnection(dbURL);
    -			Statement stat = conn.createStatement();
    -			stat.executeUpdate("DROP TABLE books");
    -			stat.close();
    -			conn.close();
    -		} catch (Exception e) {
    -			e.printStackTrace();
    -			Assert.fail();
    -		}
    -	}
    -
    -	@After
    -	public void tearDown() {
    -		jdbcInputFormat = null;
    -	}
    +public class JDBCInputFormatTest extends JDBCTestBase {
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidDriver() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
     				.setDrivername("org.apache.derby.jdbc.idontexist")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDBUrl(DB_URL)
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidURL() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +				.setDrivername(DRIVER_CLASS)
     				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
    -				.setQuery("select * from books")
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testInvalidQuery() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
     				.setQuery("iamnotsql")
     				.finish();
    -		jdbcInputFormat.open(null);
    +		jdbcInputFormat.openInputFormat();
     	}
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompleteConfiguration() throws IOException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setQuery("select * from books")
    +				.setDrivername(DRIVER_CLASS)
    +				.setQuery(SELECT_ALL_BOOKS)
     				.finish();
     	}
     
    -	@Test(expected = IOException.class)
    -	public void testIncompatibleTuple() throws IOException {
    +	@Test
    +	public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
     		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("select * from books")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(SELECT_ALL_BOOKS)
    +				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
     				.finish();
    +		jdbcInputFormat.openInputFormat();
     		jdbcInputFormat.open(null);
    -		jdbcInputFormat.nextRecord(new Tuple2());
    +		Row row =  new Row(5);
    +		int recordCount = 0;
    +		while (!jdbcInputFormat.reachedEnd()) {
    +			Row next = jdbcInputFormat.nextRecord(row);
    +			if (next == null) {
    +				break;
    +			}
    +			
    +			if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
    +			if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
    +			if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
    +			if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
    +			if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
    +
    +			for (int x = 0; x < 5; x++) {
    +				if(testData[recordCount][x]!=null) {
    +					Assert.assertEquals(testData[recordCount][x], next.productElement(x));
    +				}
    +			}
    +			recordCount++;
    +		}
    +		jdbcInputFormat.close();
    +		jdbcInputFormat.closeInputFormat();
    +		Assert.assertEquals(testData.length, recordCount);
    +	}
    +	
    +	@Test
    +	public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
    +		final int fetchSize = 1;
    +		final Long min = new Long(JDBCTestBase.testData[0][0] + "");
    +		final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
    +		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
    +		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
    +				.setParametersProvider(pramProvider)
    +				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
    +				.finish();
    +		jdbcInputFormat.openInputFormat();
    +		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
    +		int recordCount = 0;
    +		Row row =  new Row(5);
    +		for (int i = 0; i < splits.length; i++) {
    +			jdbcInputFormat.open(splits[i]);
    +			while (!jdbcInputFormat.reachedEnd()) {
    +				Row next = jdbcInputFormat.nextRecord(row);
    +				if (next == null) {
    +					break;
    +				}
    +				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
    +				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
    +				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
    +				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
    +				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
    +	
    +				for (int x = 0; x < 5; x++) {
    +					if(testData[recordCount][x]!=null) {
    +						Assert.assertEquals(testData[recordCount][x], next.productElement(x));
    +					}
    +				}
    +				recordCount++;
    +			}
    +			jdbcInputFormat.close();
    +		}
    +		jdbcInputFormat.closeInputFormat();
    +		Assert.assertEquals(testData.length, recordCount);
     	}
    +	
    +	@Test
    +	public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException {
    +		Object[][] queryParameters = new Object[2][1];
    +		queryParameters[0] = new String[]{"Kumar"};
    +		queryParameters[1] = new String[]{"Tan Ah Teck"};
    +		ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
    +		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
    +				.setParametersProvider(paramProvider)
    +				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
    +				.finish();
    +		jdbcInputFormat.openInputFormat();
    +		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
    --- End diff --
    
    check number of splits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833518
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java ---
    @@ -15,17 +15,21 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.flink.api.java.io.jdbc;
    +package org.apache.flink.api.java.io.jdbc.split;
     
    -import java.io.OutputStream;
    +import java.io.Serializable;
    +
    +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
     
    -@SuppressWarnings("unused")
     /**
    - * Utility class to disable derby logging
    - */
    -public class DerbyUtil {
    -	public static final OutputStream DEV_NULL = new OutputStream() {
    -		public void write(int b) {
    -		}
    -	};
    + * 
    + * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
    + * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
    + * 
    + * */
    +public interface ParameterValuesProvider extends Serializable {
    --- End diff --
    
    The `ParameterValuesProvider` does not need to be `Serializable`. However, all values in the parameter values array must be, otherwise the JdbcInputFormat cannot be serialized. So the signature of `getParameterValues()` should be adapted to `public Serializable[][] getParameterValues()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-220028713
  
    Thanks for the update @fpompermaier. Just had a few minor comments. After that this PR should be good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63664643
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---
    @@ -156,88 +63,97 @@ public void testInvalidQuery() throws IOException {
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompleteConfiguration() throws IOException {
     		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
    +				.setDrivername(DRIVER_CLASS)
    +				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
     				.finish();
     	}
     
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompatibleTypes() throws IOException {
     		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
     				.finish();
     		jdbcOutputFormat.open(0, 1);
     
    -		Tuple5 tuple5 = new Tuple5();
     		tuple5.setField(4, 0);
     		tuple5.setField("hello", 1);
     		tuple5.setField("world", 2);
     		tuple5.setField(0.99, 3);
     		tuple5.setField("imthewrongtype", 4);
     
    -		jdbcOutputFormat.writeRecord(tuple5);
    +		Row row = new Row(tuple5.getArity());
    +		for (int i = 0; i < tuple5.getArity(); i++) {
    +			row.setField(i, tuple5.getField(i));
    +		}
    +		jdbcOutputFormat.writeRecord(row);
     		jdbcOutputFormat.close();
     	}
     
     	@Test
    -	public void testJDBCOutputFormat() throws IOException {
    -		String sourceTable = "books";
    -		String targetTable = "newbooks";
    -		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
    -		String dbUrl = "jdbc:derby:memory:ebookshop";
    +	public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
    --- End diff --
    
    Can you change this test to no use the JdbcInputFormat, i.e., directly insert the test data and query it with SQL from the Derby instance? Otherwise, the test does not only test the JdbcOutputFormat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019194
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
     		}
     	}
     
    -	private enum SupportedTypes {
    -		BOOLEAN,
    -		BYTE,
    -		SHORT,
    -		INTEGER,
    -		LONG,
    -		STRING,
    -		FLOAT,
    -		DOUBLE
    -	}
    -
     	/**
     	 * Adds a record to the prepared statement.
     	 * <p>
     	 * When this method is called, the output format is guaranteed to be opened.
    +	 * 
    +	 * WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow
     	 *
     	 * @param tuple The records to add to the output.
     	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     	 */
     	@Override
    -	public void writeRecord(OUT tuple) throws IOException {
    +	public void writeRecord(Row tuple) throws IOException {
     		try {
    -			if (types == null) {
    -				extractTypes(tuple);
    +			for (int index = 0; index < tuple.productArity(); index++) {
    +				if(tuple.productElement(index) ==null && typesArray != null && typesArray.length > 0) {
    +					if(typesArray.length == tuple.productArity()) {
    --- End diff --
    
    spaces `if (...)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018169
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/JDBCInputSplitsGenerator.java ---
    @@ -15,17 +15,14 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.flink.api.java.io.jdbc;
    +package org.apache.flink.api.java.io.jdbc.split;
     
    -import java.io.OutputStream;
    +import java.io.Serializable;
     
    -@SuppressWarnings("unused")
    -/**
    - * Utility class to disable derby logging
    - */
    -public class DerbyUtil {
    -	public static final OutputStream DEV_NULL = new OutputStream() {
    -		public void write(int b) {
    -		}
    -	};
    +import org.apache.flink.api.java.io.QueryParamInputSplit;
    +
    +public interface JDBCInputSplitsGenerator extends Serializable {
    +
    +	/** Returns the necessary parameters array to use for splitting a table */
    +	public QueryParamInputSplit[] getInputSplits(int minNumSplits);
    --- End diff --
    
    I would rather define a method `Object[][] getParameterValues(int numberOfQueries)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63703304
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---
    @@ -102,58 +116,62 @@ public void testJDBCOutputFormat() throws IOException, InstantiationException, I
     				.finish();
     		jdbcOutputFormat.open(0, 1);
     
    -		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername(DRIVER_CLASS)
    -				.setDBUrl(DB_URL)
    -				.setQuery(SELECT_ALL_BOOKS)
    -				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
    -				.finish();
    -		jdbcInputFormat.openInputFormat();
    -		jdbcInputFormat.open(null);
    -
    -		Row row = new Row(tuple5.getArity());
    -		while (!jdbcInputFormat.reachedEnd()) {
    -			if (jdbcInputFormat.nextRecord(row) != null) {
    +		try (
    +				Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
    --- End diff --
    
    Can't you simply use `JDBCTestBase.testData` instead of reading it from the DB?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62018227
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -21,57 +21,103 @@
     import java.io.IOException;
     import java.sql.Connection;
     import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
     import java.sql.ResultSet;
     import java.sql.ResultSetMetaData;
     import java.sql.SQLException;
    -import java.sql.Statement;
    -
    -import org.apache.flink.api.common.io.NonParallelInput;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    +import java.util.Arrays;
     
     import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
     import org.apache.flink.api.common.io.RichInputFormat;
     import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.java.io.QueryParamInputSplit;
    +import org.apache.flink.api.java.io.jdbc.split.JDBCInputSplitsGenerator;
     import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.table.Row;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.core.io.GenericInputSplit;
     import org.apache.flink.core.io.InputSplit;
     import org.apache.flink.core.io.InputSplitAssigner;
    -import org.apache.flink.types.NullValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     /**
      * InputFormat to read data from a database and generate tuples.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
      * 
    + * Remark: split (if set) works only if the split column is numeric
    + * 
      * @param <OUT>
      * @see Tuple
      * @see DriverManager
      */
    -public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    -	private static final long serialVersionUID = 1L;
    +public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> {
    --- End diff --
    
    Can you update the JavaDocs of this class and its methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1941


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62833050
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -352,6 +318,10 @@ public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency)
     			format.resultSetConcurrency = resultSetConcurrency;
     			return this;
     		}
    +		public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
    --- End diff --
    
    Add new line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-220133396
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-220094095
  
    Thanks for the update. Looks good. Will merge it later :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-216232502
  
    Thanks for the PR @fpompermaier.
    I think the new format is a bit too much tailored towards certain query templates (`BETWEEN` predicate on integer column). Also modifying queries that users provide, is a bit risky, IMO.
    
    To make it more general I would propose to:
    - Accept query templates with markers, similar to parameter makers in prepared statements: `SELECT address FROM people WHERE name = ? AND birthday BETWEEN ? AND ?`.
    - Let users explicitly provide bounds. Users should know their data best and can provide bounds which take skewed distributions into account. Parameter values can be provided as `Object[]`, one array for each parameter. We can provide some utility methods to help users generating uniformly distributed parameter values.
    - Let `InputSplit` not provide two bound values but the index for the parameter value array. So each instance can build the query by substituting the parameters by values.
    
    What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r61735650
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -26,52 +26,69 @@
     import java.sql.SQLException;
     import java.sql.Statement;
     
    -import org.apache.flink.api.common.io.NonParallelInput;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
     import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
     import org.apache.flink.api.common.io.RichInputFormat;
     import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.java.io.RangeInputSplit;
    +import org.apache.flink.api.java.io.GenericRow;
     import org.apache.flink.api.java.tuple.Tuple;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.core.io.GenericInputSplit;
     import org.apache.flink.core.io.InputSplit;
     import org.apache.flink.core.io.InputSplitAssigner;
    -import org.apache.flink.types.NullValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     /**
      * InputFormat to read data from a database and generate tuples.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
      * 
    + * Remark: split (if set) works only if the split column is numeric
    + * 
      * @param <OUT>
      * @see Tuple
      * @see DriverManager
      */
    -public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    -	private static final long serialVersionUID = 1L;
    +public class JDBCInputFormat extends RichInputFormat<GenericRow, InputSplit> {
     
    +	private static final long serialVersionUID = 1L;
     	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
     
     	private String username;
     	private String password;
     	private String drivername;
     	private String dbURL;
    -	private String query;
    +	private String queryTemplate;
     	private int resultSetType;
     	private int resultSetConcurrency;
     
     	private transient Connection dbConn;
     	private transient Statement statement;
     	private transient ResultSet resultSet;
    -
    -	private int[] columnTypes = null;
    +	
    +	private String splitColumnName;
    +	private long max;
    +	private long min;
    +	private long fetchSize;
    +	
    +	private boolean hasNext = true;
    +	
    +	private static final String BETWEEN = "(%s BETWEEN %s AND %s)";
    +	public static final String CONDITIONS = "$CONDITIONS";
     
     	public JDBCInputFormat() {
     	}
     
     	@Override
     	public void configure(Configuration parameters) {
    +		//called once per inputFormat (on open)
    +		try {
    +			establishConnection();
    --- End diff --
    
    Use the new method in `RichInputFormat` (`openInputFormat()` and `closeInputFormat()`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r61735803
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/GenericRow.java ---
    @@ -0,0 +1,123 @@
    +package org.apache.flink.api.java.io;
    +
    +import java.beans.Transient;
    +import java.util.Arrays;
    +
    +import org.apache.flink.api.java.tuple.Tuple;
    +
    +/**
    + * This class represent a row as a POJO
    + * 
    + * */
    +public class GenericRow implements java.io.Serializable {
    --- End diff --
    
    Can you use the Row type of the `flink-table` instead? If yes, please add the dependency to the `pom.xml` of `flink-jdbc`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1941#issuecomment-218438515
  
    Thanks for the update @fpompermaier. Overall the PR looks good. I added a few comments and suggestions. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62024853
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericSplitsGenerator.java ---
    @@ -0,0 +1,41 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc.split;
    +
    +import org.apache.flink.api.java.io.QueryParamInputSplit;
    +
    +/** 
    + * 
    + * This splits generator allows the apply user defined splits (computed outside the IF) 
    + * 
    + * */
    +public class GenericSplitsGenerator implements JDBCInputSplitsGenerator {
    --- End diff --
    
    Please rename to `GenericParameterValueProvider`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62021375
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +203,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if(!hasNext) {
    +				return null;
    +			}
    +			try{
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    +			}catch(NullPointerException npe) {
    --- End diff --
    
    where? just before the catch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62021965
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +203,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if(!hasNext) {
    +				return null;
    +			}
    +			try{
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    +			}catch(NullPointerException npe) {
    --- End diff --
    
    L209: after if
    L212: after try
    L216: before & after catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832266
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -19,59 +19,112 @@
     package org.apache.flink.api.java.io.jdbc;
     
     import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.sql.Array;
     import java.sql.Connection;
    +import java.sql.Date;
     import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
     import java.sql.ResultSet;
     import java.sql.ResultSetMetaData;
     import java.sql.SQLException;
    -import java.sql.Statement;
    -
    -import org.apache.flink.api.common.io.NonParallelInput;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.Arrays;
     
     import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
     import org.apache.flink.api.common.io.RichInputFormat;
     import org.apache.flink.api.common.io.statistics.BaseStatistics;
    -import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
    +import org.apache.flink.api.table.Row;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.core.io.GenericInputSplit;
     import org.apache.flink.core.io.InputSplit;
     import org.apache.flink.core.io.InputSplitAssigner;
    -import org.apache.flink.types.NullValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     /**
    - * InputFormat to read data from a database and generate tuples.
    + * InputFormat to read data from a database and generate Rows.
      * The InputFormat has to be configured using the supplied InputFormatBuilder.
      * 
    - * @param <OUT>
    - * @see Tuple
    + * In order to query the JDBC source in parallel, you need to provide a parameterized
    + * query template (i.e. a valid {@link PreparedStatement}) and a {@link ParameterValuesProvider} 
    + * which provides binding values for the query parameters.
    + * 
    + * @see Row
    + * @see ParameterValuesProvider
    + * @see PreparedStatement
      * @see DriverManager
      */
    -public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    -	private static final long serialVersionUID = 1L;
    +public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> {
     
    +	private static final long serialVersionUID = 1L;
     	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
     
     	private String username;
     	private String password;
     	private String drivername;
     	private String dbURL;
    -	private String query;
    +	private String queryTemplate;
     	private int resultSetType;
     	private int resultSetConcurrency;
     
     	private transient Connection dbConn;
    -	private transient Statement statement;
    +	private transient PreparedStatement statement;
     	private transient ResultSet resultSet;
    -
    -	private int[] columnTypes = null;
    -
    +	
    +	private boolean hasNext = true;
    +	private Object[][] parameterValues;
    +	
     	public JDBCInputFormat() {
     	}
     
     	@Override
     	public void configure(Configuration parameters) {
    +		//do nothing here
    +	}
    +	
    +	@Override
    +	public void openInputFormat() {
    +		//called once per inputFormat (on open)
    +		try {
    +			Class.forName(drivername);
    +			if (username == null) {
    +				dbConn = DriverManager.getConnection(dbURL);
    +			} else {
    +				dbConn = DriverManager.getConnection(dbURL, username, password);
    +			}
    +			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
    +		} catch (SQLException se) {
    +			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
    +		} catch (ClassNotFoundException cnfe) {
    +			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
    +		}
    +	}
    +	
    +	@Override
    +	public void closeInputFormat() {
    +		//called once per inputFormat (on close)
    +		try {
    +			statement.close();
    +		} catch (SQLException se) {
    +			LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
    +		} catch (NullPointerException npe) {
    --- End diff --
    
    Check for `statement == null` instead of catching a NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62832789
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    --- End diff --
    
    If `resultSet.next()` was called at the end of `open()` we should call it again at the end of `nextRecord()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63703169
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---
    @@ -102,58 +116,62 @@ public void testJDBCOutputFormat() throws IOException, InstantiationException, I
     				.finish();
     		jdbcOutputFormat.open(0, 1);
     
    -		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername(DRIVER_CLASS)
    -				.setDBUrl(DB_URL)
    -				.setQuery(SELECT_ALL_BOOKS)
    -				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
    -				.finish();
    -		jdbcInputFormat.openInputFormat();
    -		jdbcInputFormat.open(null);
    -
    -		Row row = new Row(tuple5.getArity());
    -		while (!jdbcInputFormat.reachedEnd()) {
    -			if (jdbcInputFormat.nextRecord(row) != null) {
    +		try (
    +				Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
    +				PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_BOOKS);
    +				ResultSet resultSet = statement.executeQuery();
    +				) {
    +			while (resultSet.next()) {
    +				Row row = new Row(tuple5.getArity());
    +				for (int i = 0; i < tuple5.getArity(); i++) {
    +					row.setField(i, resultSet.getObject(i+1));
    +				}
     				jdbcOutputFormat.writeRecord(row);
     			}
    +		} catch (SQLException e) {
    +			Assert.fail("JDBC OutputFormat test failed. " + e.getMessage());
     		}
     
     		jdbcOutputFormat.close();
    -		jdbcInputFormat.close();
    -		jdbcInputFormat.closeInputFormat();
     
    -		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    -				.setDrivername(DRIVER_CLASS)
    -				.setDBUrl(DB_URL)
    -				.setQuery(SELECT_ALL_NEWBOOKS)
    -				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
    -				.finish();
    -		jdbcInputFormat.openInputFormat();
    -		jdbcInputFormat.open(null);
    -
    -		int recordCount = 0;
    -		while (!jdbcInputFormat.reachedEnd()) {
    -			row = jdbcInputFormat.nextRecord(row);
    -			if (row == null) {
    -				break;
    -			}
    -			if(row.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());}
    -			if(row.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());}
    -			if(row.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());}
    -			if(row.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());}
    -			if(row.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());}
    -
    -			for (int x = 0; x < 5; x++) {
    -				if(JDBCTestBase.testData[recordCount][x]!=null) {
    -					Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
    +		try (
    +				Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
    +				PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
    +				ResultSet resultSet = statement.executeQuery();
    +				) {
    +			int recordCount = 0;
    +			while (resultSet.next()) {
    +				Row row = new Row(tuple5.getArity());
    +				for (int i = 0; i < tuple5.getArity(); i++) {
    +					row.setField(i, resultSet.getObject(i + 1));
    +				}
    +				if (row.productElement(0) != null) {
    +					Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
    +				}
    +				if (row.productElement(1) != null) {
    +					Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
    +				}
    +				if (row.productElement(2) != null) {
    +					Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
    +				}
    +				if (row.productElement(3) != null) {
    +					Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
    +				}
    +				if (row.productElement(4) != null) {
    +					Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
     				}
    -			}
     
    -			recordCount++;
    -		}
    -		Assert.assertEquals(JDBCTestBase.testData.length, recordCount);
    +				for (int x = 0; x < tuple5.getArity(); x++) {
    +					if(JDBCTestBase.testData[recordCount][x]!=null) {
    --- End diff --
    
    spaces around `!=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62019523
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---
    @@ -157,14 +203,25 @@ public boolean reachedEnd() throws IOException {
     	 * @throws java.io.IOException
     	 */
     	@Override
    -	public OUT nextRecord(OUT tuple) throws IOException {
    +	public Row nextRecord(Row row) throws IOException {
     		try {
    -			resultSet.next();
    -			if (columnTypes == null) {
    -				extractTypes(tuple);
    +			hasNext = resultSet.next();
    +			if(!hasNext) {
    +				return null;
    +			}
    +			try{
    +				//This throws a NPE when the TypeInfo is not passed to the InputFormat,
    +				//i.e. KryoSerializer used to generate the passed row
    +				row.productArity();
    --- End diff --
    
    The problem is that the Row object is instantiated by the DataSourceTask and, if no type info is provided, it's created by the KryoSerializer and row.fields in that case is null and there's no way to instantiate it (or I don't know how to do that). Is there a way to initialize them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r63680024
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---
    @@ -156,88 +63,97 @@ public void testInvalidQuery() throws IOException {
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompleteConfiguration() throws IOException {
     		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
    +				.setDrivername(DRIVER_CLASS)
    +				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
     				.finish();
     	}
     
     
     	@Test(expected = IllegalArgumentException.class)
     	public void testIncompatibleTypes() throws IOException {
     		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
    -				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    -				.setDBUrl("jdbc:derby:memory:ebookshop")
    -				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
    +				.setDrivername(DRIVER_CLASS)
    +				.setDBUrl(DB_URL)
    +				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
     				.finish();
     		jdbcOutputFormat.open(0, 1);
     
    -		Tuple5 tuple5 = new Tuple5();
     		tuple5.setField(4, 0);
     		tuple5.setField("hello", 1);
     		tuple5.setField("world", 2);
     		tuple5.setField(0.99, 3);
     		tuple5.setField("imthewrongtype", 4);
     
    -		jdbcOutputFormat.writeRecord(tuple5);
    +		Row row = new Row(tuple5.getArity());
    +		for (int i = 0; i < tuple5.getArity(); i++) {
    +			row.setField(i, tuple5.getField(i));
    +		}
    +		jdbcOutputFormat.writeRecord(row);
     		jdbcOutputFormat.close();
     	}
     
     	@Test
    -	public void testJDBCOutputFormat() throws IOException {
    -		String sourceTable = "books";
    -		String targetTable = "newbooks";
    -		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
    -		String dbUrl = "jdbc:derby:memory:ebookshop";
    +	public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
    --- End diff --
    
    Ok..I just updated the old code here..no problem


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Flink 3750 fixed

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1941#discussion_r62834121
  
    --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---
    @@ -0,0 +1,182 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +
    +/**
    + * Base test class for JDBC Input and Output formats
    + */
    +public class JDBCTestBase {
    +	
    +	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
    +	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
    +	public static final String INPUT_TABLE = "books";
    +	public static final String OUTPUT_TABLE = "newbooks";
    +	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
    +	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
    +	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
    +	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
    +	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
    +	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
    +	
    +	protected JDBCInputFormat jdbcInputFormat;
    +	protected JDBCOutputFormat jdbcOutputFormat;
    +
    +	protected static Connection conn;
    +
    +	public static final Object[][] testData = {
    --- End diff --
    
    Add one or two records with `null` values to the test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---