You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:34 UTC
[04/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-jdbc
[FLINK-6711] Activate strict checkstyle for flink-jdbc
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23920bb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23920bb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23920bb8
Branch: refs/heads/master
Commit: 23920bb88eed1f6d1bfe55fb65b5013a784930ba
Parents: d4f7339
Author: zentol <ch...@apache.org>
Authored: Wed May 24 22:55:11 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:08 2017 +0200
----------------------------------------------------------------------
flink-connectors/flink-jdbc/pom.xml | 6 +-
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 48 ++++++-----
.../api/java/io/jdbc/JDBCOutputFormat.java | 85 ++++++++++----------
.../split/GenericParameterValuesProvider.java | 13 ++-
.../split/NumericBetweenParametersProvider.java | 24 +++---
.../io/jdbc/split/ParameterValuesProvider.java | 17 ++--
.../flink/api/java/io/jdbc/JDBCFullTest.java | 22 ++---
.../api/java/io/jdbc/JDBCInputFormatTest.java | 34 ++++----
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 15 ++--
.../flink/api/java/io/jdbc/JDBCTestBase.java | 34 ++++----
.../src/test/resources/log4j-test.properties | 2 +-
11 files changed, 159 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index a2bbaf4..0704dc8 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -18,11 +18,11 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
+
<modelVersion>4.0.0</modelVersion>
-
+
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index e714867..835fb23 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -18,38 +18,39 @@
package org.apache.flink.api.java.io.jdbc;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
/**
* InputFormat to read data from a database and generate Rows.
* The InputFormat has to be configured using the supplied InputFormatBuilder.
- * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
+ * A valid RowTypeInfo must be properly configured in the builder, e.g.:
*
* <pre><code>
* TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
@@ -70,10 +71,10 @@ import org.slf4j.LoggerFactory;
* .finish();
* </code></pre>
*
- * In order to query the JDBC source in parallel, you need to provide a
+ * <p>In order to query the JDBC source in parallel, you need to provide a
* parameterized query template (i.e. a valid {@link PreparedStatement}) and
* a {@link ParameterValuesProvider} which provides binding values for the
- * query parameters. E.g.:</br>
+ * query parameters. E.g.:
*
* <pre><code>
*
@@ -151,7 +152,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
public void closeInputFormat() {
//called once per inputFormat (on close)
try {
- if(statement != null) {
+ if (statement != null) {
statement.close();
}
} catch (SQLException se) {
@@ -161,7 +162,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
}
try {
- if(dbConn != null) {
+ if (dbConn != null) {
dbConn.close();
}
} catch (SQLException se) {
@@ -221,7 +222,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
statement.setArray(i + 1, (Array) param);
} else {
//extends with other types if needed
- throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
+ throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
}
}
if (LOG.isDebugEnabled()) {
@@ -242,7 +243,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
*/
@Override
public void close() throws IOException {
- if(resultSet == null) {
+ if (resultSet == null) {
return;
}
try {
@@ -264,7 +265,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
}
/**
- * Stores the next resultSet row in a tuple
+ * Stores the next resultSet row in a tuple.
*
* @param row row to be reused.
* @return row containing next {@link Row}
@@ -319,6 +320,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
return new JDBCInputFormatBuilder();
}
+ /**
+ * Builder for a {@link JDBCInputFormat}.
+ */
public static class JDBCInputFormatBuilder {
private final JDBCInputFormat format;
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index c5585e2..3f2ad33 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -18,52 +18,53 @@
package org.apache.flink.api.java.io.jdbc;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.Row;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
/**
* OutputFormat to write tuples into a database.
* The OutputFormat has to be configured using the supplied OutputFormatBuilder.
- *
+ *
* @see Tuple
* @see DriverManager
*/
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
-
+
private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
-
+
private String username;
private String password;
private String drivername;
private String dbURL;
private String query;
private int batchInterval = 5000;
-
+
private Connection dbConn;
private PreparedStatement upload;
-
+
private int batchCount = 0;
-
+
public int[] typesArray;
-
+
public JDBCOutputFormat() {
}
-
+
@Override
public void configure(Configuration parameters) {
}
-
+
/**
* Connects to the target database and initializes the prepared statement.
*
@@ -82,7 +83,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
-
+
private void establishConnection() throws SQLException, ClassNotFoundException {
Class.forName(drivername);
if (username == null) {
@@ -91,14 +92,13 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
}
-
+
/**
* Adds a record to the prepared statement.
- * <p>
- * When this method is called, the output format is guaranteed to be opened.
- * </p>
- *
- * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+ *
+ * <p>When this method is called, the output format is guaranteed to be opened.
+ *
+ * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
* insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
*
* @param row The records to add to the output.
@@ -110,10 +110,10 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
- }
+ }
try {
- if (typesArray == null ) {
+ if (typesArray == null) {
// no types provided
for (int index = 0; index < row.getArity(); index++) {
LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.getField(index));
@@ -209,7 +209,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
throw new IllegalArgumentException("writeRecord() failed", e);
}
}
-
+
/**
* Executes prepared statement and closes all resources of this instance.
*
@@ -228,7 +228,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
upload = null;
batchCount = 0;
}
-
+
try {
if (dbConn != null) {
dbConn.close();
@@ -239,56 +239,59 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
dbConn = null;
}
}
-
+
public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
return new JDBCOutputFormatBuilder();
}
-
+
+ /**
+ * Builder for a {@link JDBCOutputFormat}.
+ */
public static class JDBCOutputFormatBuilder {
private final JDBCOutputFormat format;
-
+
protected JDBCOutputFormatBuilder() {
this.format = new JDBCOutputFormat();
}
-
+
public JDBCOutputFormatBuilder setUsername(String username) {
format.username = username;
return this;
}
-
+
public JDBCOutputFormatBuilder setPassword(String password) {
format.password = password;
return this;
}
-
+
public JDBCOutputFormatBuilder setDrivername(String drivername) {
format.drivername = drivername;
return this;
}
-
+
public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
format.dbURL = dbURL;
return this;
}
-
+
public JDBCOutputFormatBuilder setQuery(String query) {
format.query = query;
return this;
}
-
+
public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
format.batchInterval = batchInterval;
return this;
}
-
+
public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
format.typesArray = typesArray;
return this;
}
-
+
/**
* Finalizes the configuration and checks validity.
- *
+ *
* @return Configured JDBCOutputFormat
*/
public JDBCOutputFormat finish() {
@@ -307,9 +310,9 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
if (format.drivername == null) {
throw new IllegalArgumentException("No driver supplied");
}
-
+
return format;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
index 2ed2f8c..c43e754 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
@@ -15,22 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.api.java.io.jdbc.split;
-import java.io.Serializable;
+package org.apache.flink.api.java.io.jdbc.split;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-/**
- *
+import java.io.Serializable;
+
+/**
* This splits generator actually does nothing but wrapping the query parameters
* computed by the user before creating the {@link JDBCInputFormat} instance.
- *
- * */
+ */
public class GenericParameterValuesProvider implements ParameterValuesProvider {
private final Serializable[][] parameters;
-
+
public GenericParameterValuesProvider(Serializable[][] parameters) {
this.parameters = parameters;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
index 4420172..4b8ecd6 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -15,36 +15,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.api.java.io.jdbc.split;
-import static org.apache.flink.util.Preconditions.checkArgument;
+package org.apache.flink.api.java.io.jdbc.split;
import java.io.Serializable;
-/**
- *
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
* This query parameters generator is an helper class to parameterize from/to queries on a numeric column.
* The generated array of from/to values will be equally sized to fetchSize (apart from the last one),
* ranging from minVal up to maxVal.
- *
- * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
+ *
+ * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
* <PRE>
* SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
* </PRE>
*
- * you can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
+ * <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
* based on the passed constructor parameters.
- *
- * */
+ *
+ */
public class NumericBetweenParametersProvider implements ParameterValuesProvider {
private final long fetchSize;
private final long minVal;
private final long maxVal;
-
+
/**
* NumericBetweenParametersProvider constructor.
- *
+ *
* @param fetchSize the max distance between the produced from/to pairs
* @param minVal the lower bound of the produced "from" values
* @param maxVal the upper bound of the produced "to" values
@@ -72,5 +72,5 @@ public class NumericBetweenParametersProvider implements ParameterValuesProvider
}
return parameters;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
index c194497..f31c6e1 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
@@ -15,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.api.java.io.jdbc.split;
-import java.io.Serializable;
+package org.apache.flink.api.java.io.jdbc.split;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import java.io.Serializable;
+
/**
- *
* This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
- * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
- *
- * */
+ * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider}
+ * implementation.
+ */
public interface ParameterValuesProvider {
- /** Returns the necessary parameters array to use for query in parallel a table */
- public Serializable[][] getParameterValues();
-
+ /** Returns the necessary parameters array to use for query in parallel a table. */
+ Serializable[][] getParameterValues();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 78cf69c..bd575c3 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -18,20 +18,24 @@
package org.apache.flink.api.java.io.jdbc;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Types;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
import org.apache.flink.types.Row;
+
import org.junit.Assert;
import org.junit.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Types;
+
+/**
+ * Tests using both {@link JDBCInputFormat} and {@link JDBCOutputFormat}.
+ */
public class JDBCFullTest extends JDBCTestBase {
@Test
@@ -50,7 +54,7 @@ public class JDBCFullTest extends JDBCTestBase {
.setDrivername(JDBCTestBase.DRIVER_CLASS)
.setDBUrl(JDBCTestBase.DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo);
+ .setRowTypeInfo(ROW_TYPE_INFO);
if (exploitParallelism) {
final int fetchSize = 1;
@@ -69,8 +73,8 @@ public class JDBCFullTest extends JDBCTestBase {
source.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(JDBCTestBase.DRIVER_CLASS)
.setDBUrl(JDBCTestBase.DB_URL)
- .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
- .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
+ .setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)")
+ .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
.finish());
environment.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 3f6a87a..b1416ea 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.types.Row;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -31,6 +32,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.sql.ResultSet;
+/**
+ * Tests for the {@link JDBCInputFormat}.
+ */
public class JDBCInputFormatTest extends JDBCTestBase {
private JDBCInputFormat jdbcInputFormat;
@@ -60,7 +64,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.setDrivername("org.apache.derby.jdbc.idontexist")
.setDBUrl(DB_URL)
.setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.finish();
jdbcInputFormat.openInputFormat();
}
@@ -71,7 +75,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.setDrivername(DRIVER_CLASS)
.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
.setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.finish();
jdbcInputFormat.openInputFormat();
}
@@ -82,7 +86,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery("iamnotsql")
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.finish();
jdbcInputFormat.openInputFormat();
}
@@ -92,7 +96,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.finish();
}
@@ -102,7 +106,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
//this query does not exploit parallelism
@@ -122,7 +126,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
-
+
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
final int fetchSize = 1;
@@ -133,7 +137,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
@@ -163,13 +167,13 @@ public class JDBCInputFormatTest extends JDBCTestBase {
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
- final long fetchSize = max + 1;//generate a single split
+ final long fetchSize = max + 1; //generate a single split
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
@@ -194,7 +198,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
-
+
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
@@ -205,7 +209,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(paramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
@@ -231,21 +235,21 @@ public class JDBCInputFormatTest extends JDBCTestBase {
int id = ((int) row.getField(0));
int testDataIndex = id - 1001;
-
+
assertEquals(TEST_DATA[testDataIndex], row);
sum += id;
}
-
+
Assert.assertEquals(expectedIDSum, sum);
}
-
+
@Test
public void testEmptyResults() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(SELECT_EMPTY)
- .setRowTypeInfo(rowTypeInfo)
+ .setRowTypeInfo(ROW_TYPE_INFO)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index a67c1ce..3f14504 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -18,6 +18,12 @@
package org.apache.flink.api.java.io.jdbc;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -25,11 +31,9 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
+/**
+ * Tests for the {@link JDBCOutputFormat}.
+ */
public class JDBCOutputFormatTest extends JDBCTestBase {
private JDBCOutputFormat jdbcOutputFormat;
@@ -80,7 +84,6 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
.finish();
}
-
@Test(expected = IllegalArgumentException.class)
public void testIncompatibleTypes() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 13da4c7..7189393 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -15,24 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.io.jdbc;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
/**
- * Base test class for JDBC Input and Output formats
+ * Base test class for JDBC Input and Output formats.
*/
public class JDBCTestBase {
-
+
public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
public static final String DB_URL = "jdbc:derby:memory:ebookshop";
public static final String INPUT_TABLE = "books";
@@ -43,7 +45,7 @@ public class JDBCTestBase {
public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
-
+
public static final TestEntry[] TEST_DATA = {
new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11),
new TestEntry(1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22),
@@ -57,13 +59,13 @@ public class JDBCTestBase {
new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010)
};
- protected static class TestEntry {
+ static class TestEntry {
protected final Integer id;
protected final String title;
protected final String author;
protected final Double price;
protected final Integer qty;
-
+
private TestEntry(Integer id, String title, String author, Double price, Integer qty) {
this.id = id;
this.title = title;
@@ -73,7 +75,7 @@ public class JDBCTestBase {
}
}
- public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(
+ public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
@@ -91,7 +93,7 @@ public class JDBCTestBase {
sqlQueryBuilder.append("PRIMARY KEY (id))");
return sqlQueryBuilder.toString();
}
-
+
public static String getInsertQuery() {
StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
for (int i = 0; i < TEST_DATA.length; i++) {
@@ -108,7 +110,7 @@ public class JDBCTestBase {
String insertQuery = sqlQueryBuilder.toString();
return insertQuery;
}
-
+
public static final OutputStream DEV_NULL = new OutputStream() {
@Override
public void write(int b) {
@@ -126,13 +128,13 @@ public class JDBCTestBase {
insertDataIntoInputTable(conn);
}
}
-
+
private static void createTable(Connection conn, String tableName) throws SQLException {
Statement stat = conn.createStatement();
stat.executeUpdate(getCreateQuery(tableName));
stat.close();
}
-
+
private static void insertDataIntoInputTable(Connection conn) throws SQLException {
Statement stat = conn.createStatement();
stat.execute(getInsertQuery());
@@ -147,7 +149,7 @@ public class JDBCTestBase {
Statement stat = conn.createStatement()) {
stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
- stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
+ stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
index 2fb9345..c977d4c 100644
--- a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
@@ -16,4 +16,4 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF
\ No newline at end of file
+log4j.rootLogger=OFF