You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:34 UTC

[04/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-jdbc

[FLINK-6711] Activate strict checkstyle for flink-jdbc


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23920bb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23920bb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23920bb8

Branch: refs/heads/master
Commit: 23920bb88eed1f6d1bfe55fb65b5013a784930ba
Parents: d4f7339
Author: zentol <ch...@apache.org>
Authored: Wed May 24 22:55:11 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:08 2017 +0200

----------------------------------------------------------------------
 flink-connectors/flink-jdbc/pom.xml             |  6 +-
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 48 ++++++-----
 .../api/java/io/jdbc/JDBCOutputFormat.java      | 85 ++++++++++----------
 .../split/GenericParameterValuesProvider.java   | 13 ++-
 .../split/NumericBetweenParametersProvider.java | 24 +++---
 .../io/jdbc/split/ParameterValuesProvider.java  | 17 ++--
 .../flink/api/java/io/jdbc/JDBCFullTest.java    | 22 ++---
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 34 ++++----
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  | 15 ++--
 .../flink/api/java/io/jdbc/JDBCTestBase.java    | 34 ++++----
 .../src/test/resources/log4j-test.properties    |  2 +-
 11 files changed, 159 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index a2bbaf4..0704dc8 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -18,11 +18,11 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	
+
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
+
 	<modelVersion>4.0.0</modelVersion>
-	
+
 	<parent>
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-connectors</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index e714867..835fb23 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -18,38 +18,39 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
 /**
  * InputFormat to read data from a database and generate Rows.
  * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
+ * A valid RowTypeInfo must be properly configured in the builder, e.g.:
  *
  * <pre><code>
  * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
@@ -70,10 +71,10 @@ import org.slf4j.LoggerFactory;
  *				.finish();
  * </code></pre>
  *
- * In order to query the JDBC source in parallel, you need to provide a
+ * <p>In order to query the JDBC source in parallel, you need to provide a
  * parameterized query template (i.e. a valid {@link PreparedStatement}) and
  * a {@link ParameterValuesProvider} which provides binding values for the
- * query parameters. E.g.:</br>
+ * query parameters. E.g.:
  *
  * <pre><code>
  *
@@ -151,7 +152,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 	public void closeInputFormat() {
 		//called once per inputFormat (on close)
 		try {
-			if(statement != null) {
+			if (statement != null) {
 				statement.close();
 			}
 		} catch (SQLException se) {
@@ -161,7 +162,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 		}
 
 		try {
-			if(dbConn != null) {
+			if (dbConn != null) {
 				dbConn.close();
 			}
 		} catch (SQLException se) {
@@ -221,7 +222,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 						statement.setArray(i + 1, (Array) param);
 					} else {
 						//extends with other types if needed
-						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
+						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
 					}
 				}
 				if (LOG.isDebugEnabled()) {
@@ -242,7 +243,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 	 */
 	@Override
 	public void close() throws IOException {
-		if(resultSet == null) {
+		if (resultSet == null) {
 			return;
 		}
 		try {
@@ -264,7 +265,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 	}
 
 	/**
-	 * Stores the next resultSet row in a tuple
+	 * Stores the next resultSet row in a tuple.
 	 *
 	 * @param row row to be reused.
 	 * @return row containing next {@link Row}
@@ -319,6 +320,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 		return new JDBCInputFormatBuilder();
 	}
 
+	/**
+	 * Builder for a {@link JDBCInputFormat}.
+	 */
 	public static class JDBCInputFormatBuilder {
 		private final JDBCInputFormat format;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index c5585e2..3f2ad33 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -18,52 +18,53 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.Row;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
 /**
  * OutputFormat to write tuples into a database.
  * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
- * 
+ *
  * @see Tuple
  * @see DriverManager
  */
 public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	private static final long serialVersionUID = 1L;
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
-	
+
 	private String username;
 	private String password;
 	private String drivername;
 	private String dbURL;
 	private String query;
 	private int batchInterval = 5000;
-	
+
 	private Connection dbConn;
 	private PreparedStatement upload;
-	
+
 	private int batchCount = 0;
-	
+
 	public int[] typesArray;
-	
+
 	public JDBCOutputFormat() {
 	}
-	
+
 	@Override
 	public void configure(Configuration parameters) {
 	}
-	
+
 	/**
 	 * Connects to the target database and initializes the prepared statement.
 	 *
@@ -82,7 +83,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
 		}
 	}
-	
+
 	private void establishConnection() throws SQLException, ClassNotFoundException {
 		Class.forName(drivername);
 		if (username == null) {
@@ -91,14 +92,13 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			dbConn = DriverManager.getConnection(dbURL, username, password);
 		}
 	}
-	
+
 	/**
 	 * Adds a record to the prepared statement.
-	 * <p>
-	 * When this method is called, the output format is guaranteed to be opened.
-	 * </p>
-	 * 
-	 * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+	 *
+	 * <p>When this method is called, the output format is guaranteed to be opened.
+	 *
+	 * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
 	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
 	 *
 	 * @param row The records to add to the output.
@@ -110,10 +110,10 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 
 		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
 			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
-		} 
+		}
 		try {
 
-			if (typesArray == null ) {
+			if (typesArray == null) {
 				// no types provided
 				for (int index = 0; index < row.getArity(); index++) {
 					LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.getField(index));
@@ -209,7 +209,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			throw new IllegalArgumentException("writeRecord() failed", e);
 		}
 	}
-	
+
 	/**
 	 * Executes prepared statement and closes all resources of this instance.
 	 *
@@ -228,7 +228,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			upload = null;
 			batchCount = 0;
 		}
-		
+
 		try {
 			if (dbConn != null) {
 				dbConn.close();
@@ -239,56 +239,59 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			dbConn = null;
 		}
 	}
-	
+
 	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
 		return new JDBCOutputFormatBuilder();
 	}
-	
+
+	/**
+	 * Builder for a {@link JDBCOutputFormat}.
+	 */
 	public static class JDBCOutputFormatBuilder {
 		private final JDBCOutputFormat format;
-		
+
 		protected JDBCOutputFormatBuilder() {
 			this.format = new JDBCOutputFormat();
 		}
-		
+
 		public JDBCOutputFormatBuilder setUsername(String username) {
 			format.username = username;
 			return this;
 		}
-		
+
 		public JDBCOutputFormatBuilder setPassword(String password) {
 			format.password = password;
 			return this;
 		}
-		
+
 		public JDBCOutputFormatBuilder setDrivername(String drivername) {
 			format.drivername = drivername;
 			return this;
 		}
-		
+
 		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
 			format.dbURL = dbURL;
 			return this;
 		}
-		
+
 		public JDBCOutputFormatBuilder setQuery(String query) {
 			format.query = query;
 			return this;
 		}
-		
+
 		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
 			format.batchInterval = batchInterval;
 			return this;
 		}
-		
+
 		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
 			format.typesArray = typesArray;
 			return this;
 		}
-		
+
 		/**
 		 * Finalizes the configuration and checks validity.
-		 * 
+		 *
 		 * @return Configured JDBCOutputFormat
 		 */
 		public JDBCOutputFormat finish() {
@@ -307,9 +310,9 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			if (format.drivername == null) {
 				throw new IllegalArgumentException("No driver supplied");
 			}
-			
+
 			return format;
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
index 2ed2f8c..c43e754 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
@@ -15,22 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.java.io.jdbc.split;
 
-import java.io.Serializable;
+package org.apache.flink.api.java.io.jdbc.split;
 
 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
 
-/** 
- * 
+import java.io.Serializable;
+
+/**
  * This splits generator actually does nothing but wrapping the query parameters
  * computed by the user before creating the {@link JDBCInputFormat} instance.
- * 
- * */
+ */
 public class GenericParameterValuesProvider implements ParameterValuesProvider {
 
 	private final Serializable[][] parameters;
-	
+
 	public GenericParameterValuesProvider(Serializable[][] parameters) {
 		this.parameters = parameters;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
index 4420172..4b8ecd6 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -15,36 +15,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.java.io.jdbc.split;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
+package org.apache.flink.api.java.io.jdbc.split;
 
 import java.io.Serializable;
 
-/** 
- * 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
  * This query parameters generator is an helper class to parameterize from/to queries on a numeric column.
  * The generated array of from/to values will be equally sized to fetchSize (apart from the last one),
  * ranging from minVal up to maxVal.
- * 
- * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
+ *
+ * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
  * <PRE>
  *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
  * </PRE>
  *
- * you can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
+ * <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
  * based on the passed constructor parameters.
- * 
- * */
+ *
+ */
 public class NumericBetweenParametersProvider implements ParameterValuesProvider {
 
 	private final long fetchSize;
 	private final long minVal;
 	private final long maxVal;
-	
+
 	/**
 	 * NumericBetweenParametersProvider constructor.
-	 * 
+	 *
 	 * @param fetchSize the max distance between the produced from/to pairs
 	 * @param minVal the lower bound of the produced "from" values
 	 * @param maxVal the upper bound of the produced "to" values
@@ -72,5 +72,5 @@ public class NumericBetweenParametersProvider implements ParameterValuesProvider
 		}
 		return parameters;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
index c194497..f31c6e1 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
@@ -15,21 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.java.io.jdbc.split;
 
-import java.io.Serializable;
+package org.apache.flink.api.java.io.jdbc.split;
 
 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
 
+import java.io.Serializable;
+
 /**
- * 
  * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
- * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
- * 
- * */
+ * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider}
+ * implementation.
+ */
 public interface ParameterValuesProvider {
 
-	/** Returns the necessary parameters array to use for query in parallel a table */
-	public Serializable[][] getParameterValues();
-	
+	/** Returns the necessary parameters array to use for query in parallel a table. */
+	Serializable[][] getParameterValues();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 78cf69c..bd575c3 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -18,20 +18,24 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Types;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
 import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
 import org.apache.flink.types.Row;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Types;
+
+/**
+ * Tests using both {@link JDBCInputFormat} and {@link JDBCOutputFormat}.
+ */
 public class JDBCFullTest extends JDBCTestBase {
 
 	@Test
@@ -50,7 +54,7 @@ public class JDBCFullTest extends JDBCTestBase {
 				.setDrivername(JDBCTestBase.DRIVER_CLASS)
 				.setDBUrl(JDBCTestBase.DB_URL)
 				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo);
+				.setRowTypeInfo(ROW_TYPE_INFO);
 
 		if (exploitParallelism) {
 			final int fetchSize = 1;
@@ -69,8 +73,8 @@ public class JDBCFullTest extends JDBCTestBase {
 		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
 				.setDrivername(JDBCTestBase.DRIVER_CLASS)
 				.setDBUrl(JDBCTestBase.DB_URL)
-				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
-				.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
+				.setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)")
+				.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
 				.finish());
 
 		environment.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 3f6a87a..b1416ea 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.types.Row;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,6 +32,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.sql.ResultSet;
 
+/**
+ * Tests for the {@link JDBCInputFormat}.
+ */
 public class JDBCInputFormatTest extends JDBCTestBase {
 
 	private JDBCInputFormat jdbcInputFormat;
@@ -60,7 +64,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.setDrivername("org.apache.derby.jdbc.idontexist")
 				.setDBUrl(DB_URL)
 				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
 		jdbcInputFormat.openInputFormat();
 	}
@@ -71,7 +75,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
 				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
 		jdbcInputFormat.openInputFormat();
 	}
@@ -82,7 +86,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
 				.setQuery("iamnotsql")
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
 		jdbcInputFormat.openInputFormat();
 	}
@@ -92,7 +96,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername(DRIVER_CLASS)
 				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
 	}
 
@@ -102,7 +106,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
 				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
 				.finish();
 		//this query does not exploit parallelism
@@ -122,7 +126,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		jdbcInputFormat.closeInputFormat();
 		Assert.assertEquals(TEST_DATA.length, recordCount);
 	}
-	
+
 	@Test
 	public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
 		final int fetchSize = 1;
@@ -133,7 +137,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
 				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setParametersProvider(pramProvider)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
 				.finish();
@@ -163,13 +167,13 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
 		final long min = TEST_DATA[0].id;
 		final long max = TEST_DATA[TEST_DATA.length - 1].id;
-		final long fetchSize = max + 1;//generate a single split
+		final long fetchSize = max + 1; //generate a single split
 		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
 				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setParametersProvider(pramProvider)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
 				.finish();
@@ -194,7 +198,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		jdbcInputFormat.closeInputFormat();
 		Assert.assertEquals(TEST_DATA.length, recordCount);
 	}
-	
+
 	@Test
 	public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
 		Serializable[][] queryParameters = new String[2][1];
@@ -205,7 +209,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
 				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setParametersProvider(paramProvider)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
 				.finish();
@@ -231,21 +235,21 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 
 			int id = ((int) row.getField(0));
 			int testDataIndex = id - 1001;
-			
+
 			assertEquals(TEST_DATA[testDataIndex], row);
 			sum += id;
 		}
-		
+
 		Assert.assertEquals(expectedIDSum, sum);
 	}
-	
+
 	@Test
 	public void testEmptyResults() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
 				.setQuery(SELECT_EMPTY)
-				.setRowTypeInfo(rowTypeInfo)
+				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
 				.finish();
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index a67c1ce..3f14504 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -25,11 +31,9 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
-import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
+/**
+ * Tests for the {@link JDBCOutputFormat}.
+ */
 public class JDBCOutputFormatTest extends JDBCTestBase {
 
 	private JDBCOutputFormat jdbcOutputFormat;
@@ -80,7 +84,6 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 				.finish();
 	}
 
-
 	@Test(expected = IllegalArgumentException.class)
 	public void testIncompatibleTypes() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 13da4c7..7189393 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -15,24 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
 import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
 /**
- * Base test class for JDBC Input and Output formats
+ * Base test class for JDBC Input and Output formats.
  */
 public class JDBCTestBase {
-	
+
 	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
 	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
 	public static final String INPUT_TABLE = "books";
@@ -43,7 +45,7 @@ public class JDBCTestBase {
 	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
 	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
 	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
-	
+
 	public static final TestEntry[] TEST_DATA = {
 			new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11),
 			new TestEntry(1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22),
@@ -57,13 +59,13 @@ public class JDBCTestBase {
 			new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010)
 	};
 
-	protected static class TestEntry {
+	static class TestEntry {
 		protected final Integer id;
 		protected final String title;
 		protected final String author;
 		protected final Double price;
 		protected final Integer qty;
-		
+
 		private TestEntry(Integer id, String title, String author, Double price, Integer qty) {
 			this.id = id;
 			this.title = title;
@@ -73,7 +75,7 @@ public class JDBCTestBase {
 		}
 	}
 
-	public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(
+	public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
 		BasicTypeInfo.INT_TYPE_INFO,
 		BasicTypeInfo.STRING_TYPE_INFO,
 		BasicTypeInfo.STRING_TYPE_INFO,
@@ -91,7 +93,7 @@ public class JDBCTestBase {
 		sqlQueryBuilder.append("PRIMARY KEY (id))");
 		return sqlQueryBuilder.toString();
 	}
-	
+
 	public static String getInsertQuery() {
 		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
 		for (int i = 0; i < TEST_DATA.length; i++) {
@@ -108,7 +110,7 @@ public class JDBCTestBase {
 		String insertQuery = sqlQueryBuilder.toString();
 		return insertQuery;
 	}
-	
+
 	public static final OutputStream DEV_NULL = new OutputStream() {
 		@Override
 		public void write(int b) {
@@ -126,13 +128,13 @@ public class JDBCTestBase {
 			insertDataIntoInputTable(conn);
 		}
 	}
-	
+
 	private static void createTable(Connection conn, String tableName) throws SQLException {
 		Statement stat = conn.createStatement();
 		stat.executeUpdate(getCreateQuery(tableName));
 		stat.close();
 	}
-	
+
 	private static void insertDataIntoInputTable(Connection conn) throws SQLException {
 		Statement stat = conn.createStatement();
 		stat.execute(getInsertQuery());
@@ -147,7 +149,7 @@ public class JDBCTestBase {
 			Statement stat = conn.createStatement()) {
 
 			stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
-			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);	
+			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23920bb8/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
index 2fb9345..c977d4c 100644
--- a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF
\ No newline at end of file
+log4j.rootLogger=OFF