You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:36 UTC

[46/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index b4246f5..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * InputFormat to read data from a database and generate Rows.
- * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
- *
- * <pre><code>
- * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
- *		BasicTypeInfo.INT_TYPE_INFO,
- *		BasicTypeInfo.STRING_TYPE_INFO,
- *		BasicTypeInfo.STRING_TYPE_INFO,
- *		BasicTypeInfo.DOUBLE_TYPE_INFO,
- *		BasicTypeInfo.INT_TYPE_INFO
- *	};
- *
- * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
- *
- * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- *				.setDBUrl("jdbc:derby:memory:ebookshop")
- *				.setQuery("select * from books")
- *				.setRowTypeInfo(rowTypeInfo)
- *				.finish();
- * </code></pre>
- *
- * In order to query the JDBC source in parallel, you need to provide a
- * parameterized query template (i.e. a valid {@link PreparedStatement}) and
- * a {@link ParameterValuesProvider} which provides binding values for the
- * query parameters. E.g.:</br>
- *
- * <pre><code>
- *
- * Serializable[][] queryParameters = new String[2][1];
- * queryParameters[0] = new String[]{"Kumar"};
- * queryParameters[1] = new String[]{"Tan Ah Teck"};
- *
- * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- *				.setDBUrl("jdbc:derby:memory:ebookshop")
- *				.setQuery("select * from books WHERE author = ?")
- *				.setRowTypeInfo(rowTypeInfo)
- *				.setParametersProvider(new GenericParameterValuesProvider(queryParameters))
- *				.finish();
- * </code></pre>
- *
- * @see Row
- * @see ParameterValuesProvider
- * @see PreparedStatement
- * @see DriverManager
- */
-public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable {
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
-
-	private String username;
-	private String password;
-	private String drivername;
-	private String dbURL;
-	private String queryTemplate;
-	private int resultSetType;
-	private int resultSetConcurrency;
-	private RowTypeInfo rowTypeInfo;
-
-	private transient Connection dbConn;
-	private transient PreparedStatement statement;
-	private transient ResultSet resultSet;
-
-	private boolean hasNext;
-	private Object[][] parameterValues;
-
-	public JDBCInputFormat() {
-	}
-
-	@Override
-	public RowTypeInfo getProducedType() {
-		return rowTypeInfo;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		//do nothing here
-	}
-
-	@Override
-	public void openInputFormat() {
-		//called once per inputFormat (on open)
-		try {
-			Class.forName(drivername);
-			if (username == null) {
-				dbConn = DriverManager.getConnection(dbURL);
-			} else {
-				dbConn = DriverManager.getConnection(dbURL, username, password);
-			}
-			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
-		} catch (SQLException se) {
-			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
-		}
-	}
-
-	@Override
-	public void closeInputFormat() {
-		//called once per inputFormat (on close)
-		try {
-			if(statement != null) {
-				statement.close();
-			}
-		} catch (SQLException se) {
-			LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
-		} finally {
-			statement = null;
-		}
-
-		try {
-			if(dbConn != null) {
-				dbConn.close();
-			}
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} finally {
-			dbConn = null;
-		}
-
-		parameterValues = null;
-	}
-
-	/**
-	 * Connects to the source database and executes the query in a <b>parallel
-	 * fashion</b> if
-	 * this {@link InputFormat} is built using a parameterized query (i.e. using
-	 * a {@link PreparedStatement})
-	 * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
-	 * fashion</b> otherwise.
-	 *
-	 * @param inputSplit which is ignored if this InputFormat is executed as a
-	 *        non-parallel source,
-	 *        a "hook" to the query parameters otherwise (using its
-	 *        <i>splitNumber</i>)
-	 * @throws IOException if there's an error during the execution of the query
-	 */
-	@Override
-	public void open(InputSplit inputSplit) throws IOException {
-		try {
-			if (inputSplit != null && parameterValues != null) {
-				for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
-					Object param = parameterValues[inputSplit.getSplitNumber()][i];
-					if (param instanceof String) {
-						statement.setString(i + 1, (String) param);
-					} else if (param instanceof Long) {
-						statement.setLong(i + 1, (Long) param);
-					} else if (param instanceof Integer) {
-						statement.setInt(i + 1, (Integer) param);
-					} else if (param instanceof Double) {
-						statement.setDouble(i + 1, (Double) param);
-					} else if (param instanceof Boolean) {
-						statement.setBoolean(i + 1, (Boolean) param);
-					} else if (param instanceof Float) {
-						statement.setFloat(i + 1, (Float) param);
-					} else if (param instanceof BigDecimal) {
-						statement.setBigDecimal(i + 1, (BigDecimal) param);
-					} else if (param instanceof Byte) {
-						statement.setByte(i + 1, (Byte) param);
-					} else if (param instanceof Short) {
-						statement.setShort(i + 1, (Short) param);
-					} else if (param instanceof Date) {
-						statement.setDate(i + 1, (Date) param);
-					} else if (param instanceof Time) {
-						statement.setTime(i + 1, (Time) param);
-					} else if (param instanceof Timestamp) {
-						statement.setTimestamp(i + 1, (Timestamp) param);
-					} else if (param instanceof Array) {
-						statement.setArray(i + 1, (Array) param);
-					} else {
-						//extends with other types if needed
-						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
-					}
-				}
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
-				}
-			}
-			resultSet = statement.executeQuery();
-			hasNext = resultSet.next();
-		} catch (SQLException se) {
-			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
-		}
-	}
-
-	/**
-	 * Closes all resources used.
-	 *
-	 * @throws IOException Indicates that a resource could not be closed.
-	 */
-	@Override
-	public void close() throws IOException {
-		if(resultSet == null) {
-			return;
-		}
-		try {
-			resultSet.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
-		}
-	}
-
-	/**
-	 * Checks whether all data has been read.
-	 *
-	 * @return boolean value indication whether all data has been read.
-	 * @throws IOException
-	 */
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !hasNext;
-	}
-
-	/**
-	 * Stores the next resultSet row in a tuple
-	 *
-	 * @param row row to be reused.
-	 * @return row containing next {@link Row}
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public Row nextRecord(Row row) throws IOException {
-		try {
-			if (!hasNext) {
-				return null;
-			}
-			for (int pos = 0; pos < row.productArity(); pos++) {
-				row.setField(pos, resultSet.getObject(pos + 1));
-			}
-			//update hasNext after we've read the record
-			hasNext = resultSet.next();
-			return row;
-		} catch (SQLException se) {
-			throw new IOException("Couldn't read data - " + se.getMessage(), se);
-		} catch (NullPointerException npe) {
-			throw new IOException("Couldn't access resultSet", npe);
-		}
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return cachedStatistics;
-	}
-
-	@Override
-	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		if (parameterValues == null) {
-			return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
-		}
-		GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
-		for (int i = 0; i < ret.length; i++) {
-			ret[i] = new GenericInputSplit(i, ret.length);
-		}
-		return ret;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
-		return new DefaultInputSplitAssigner(inputSplits);
-	}
-
-	/**
-	 * A builder used to set parameters to the output format's configuration in a fluent way.
-	 * @return builder
-	 */
-	public static JDBCInputFormatBuilder buildJDBCInputFormat() {
-		return new JDBCInputFormatBuilder();
-	}
-
-	public static class JDBCInputFormatBuilder {
-		private final JDBCInputFormat format;
-
-		public JDBCInputFormatBuilder() {
-			this.format = new JDBCInputFormat();
-			//using TYPE_FORWARD_ONLY for high performance reads
-			this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
-			this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
-		}
-
-		public JDBCInputFormatBuilder setUsername(String username) {
-			format.username = username;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setPassword(String password) {
-			format.password = password;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setDrivername(String drivername) {
-			format.drivername = drivername;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setDBUrl(String dbURL) {
-			format.dbURL = dbURL;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setQuery(String query) {
-			format.queryTemplate = query;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setResultSetType(int resultSetType) {
-			format.resultSetType = resultSetType;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) {
-			format.resultSetConcurrency = resultSetConcurrency;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
-			format.parameterValues = parameterValuesProvider.getParameterValues();
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
-			format.rowTypeInfo = rowTypeInfo;
-			return this;
-		}
-
-		public JDBCInputFormat finish() {
-			if (format.username == null) {
-				LOG.info("Username was not supplied separately.");
-			}
-			if (format.password == null) {
-				LOG.info("Password was not supplied separately.");
-			}
-			if (format.dbURL == null) {
-				throw new IllegalArgumentException("No database URL supplied");
-			}
-			if (format.queryTemplate == null) {
-				throw new IllegalArgumentException("No query supplied");
-			}
-			if (format.drivername == null) {
-				throw new IllegalArgumentException("No driver supplied");
-			}
-			if (format.rowTypeInfo == null) {
-				throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
-			}
-			if (format.parameterValues == null) {
-				LOG.debug("No input splitting configured (data will be read with parallelism 1).");
-			}
-			return format;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index da4b1ad..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * OutputFormat to write tuples into a database.
- * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
- * 
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCOutputFormat extends RichOutputFormat<Row> {
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
-	
-	private String username;
-	private String password;
-	private String drivername;
-	private String dbURL;
-	private String query;
-	private int batchInterval = 5000;
-	
-	private Connection dbConn;
-	private PreparedStatement upload;
-	
-	private int batchCount = 0;
-	
-	public int[] typesArray;
-	
-	public JDBCOutputFormat() {
-	}
-	
-	@Override
-	public void configure(Configuration parameters) {
-	}
-	
-	/**
-	 * Connects to the target database and initializes the prepared statement.
-	 *
-	 * @param taskNumber The number of the parallel instance.
-	 * @throws IOException Thrown, if the output could not be opened due to an
-	 * I/O problem.
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		try {
-			establishConnection();
-			upload = dbConn.prepareStatement(query);
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("open() failed.", sqe);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
-		}
-	}
-	
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(drivername);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
-		}
-	}
-	
-	/**
-	 * Adds a record to the prepared statement.
-	 * <p>
-	 * When this method is called, the output format is guaranteed to be opened.
-	 * </p>
-	 * 
-	 * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
-	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
-	 *
-	 * @param row The records to add to the output.
-	 * @see PreparedStatement
-	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
-	 */
-	@Override
-	public void writeRecord(Row row) throws IOException {
-
-		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
-			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
-		} 
-		try {
-
-			if (typesArray == null ) {
-				// no types provided
-				for (int index = 0; index < row.productArity(); index++) {
-					LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
-					upload.setObject(index + 1, row.productElement(index));
-				}
-			} else {
-				// types provided
-				for (int index = 0; index < row.productArity(); index++) {
-
-					if (row.productElement(index) == null) {
-						upload.setNull(index + 1, typesArray[index]);
-					} else {
-						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
-						switch (typesArray[index]) {
-							case java.sql.Types.NULL:
-								upload.setNull(index + 1, typesArray[index]);
-								break;
-							case java.sql.Types.BOOLEAN:
-							case java.sql.Types.BIT:
-								upload.setBoolean(index + 1, (boolean) row.productElement(index));
-								break;
-							case java.sql.Types.CHAR:
-							case java.sql.Types.NCHAR:
-							case java.sql.Types.VARCHAR:
-							case java.sql.Types.LONGVARCHAR:
-							case java.sql.Types.LONGNVARCHAR:
-								upload.setString(index + 1, (String) row.productElement(index));
-								break;
-							case java.sql.Types.TINYINT:
-								upload.setByte(index + 1, (byte) row.productElement(index));
-								break;
-							case java.sql.Types.SMALLINT:
-								upload.setShort(index + 1, (short) row.productElement(index));
-								break;
-							case java.sql.Types.INTEGER:
-								upload.setInt(index + 1, (int) row.productElement(index));
-								break;
-							case java.sql.Types.BIGINT:
-								upload.setLong(index + 1, (long) row.productElement(index));
-								break;
-							case java.sql.Types.REAL:
-								upload.setFloat(index + 1, (float) row.productElement(index));
-								break;
-							case java.sql.Types.FLOAT:
-							case java.sql.Types.DOUBLE:
-								upload.setDouble(index + 1, (double) row.productElement(index));
-								break;
-							case java.sql.Types.DECIMAL:
-							case java.sql.Types.NUMERIC:
-								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
-								break;
-							case java.sql.Types.DATE:
-								upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
-								break;
-							case java.sql.Types.TIME:
-								upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
-								break;
-							case java.sql.Types.TIMESTAMP:
-								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
-								break;
-							case java.sql.Types.BINARY:
-							case java.sql.Types.VARBINARY:
-							case java.sql.Types.LONGVARBINARY:
-								upload.setBytes(index + 1, (byte[]) row.productElement(index));
-								break;
-							default:
-								upload.setObject(index + 1, row.productElement(index));
-								LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
-									typesArray[index], index + 1, row.productElement(index));
-								// case java.sql.Types.SQLXML
-								// case java.sql.Types.ARRAY:
-								// case java.sql.Types.JAVA_OBJECT:
-								// case java.sql.Types.BLOB:
-								// case java.sql.Types.CLOB:
-								// case java.sql.Types.NCLOB:
-								// case java.sql.Types.DATALINK:
-								// case java.sql.Types.DISTINCT:
-								// case java.sql.Types.OTHER:
-								// case java.sql.Types.REF:
-								// case java.sql.Types.ROWID:
-								// case java.sql.Types.STRUC
-						}
-					}
-				}
-			}
-			upload.addBatch();
-			batchCount++;
-			if (batchCount >= batchInterval) {
-				upload.executeBatch();
-				batchCount = 0;
-			}
-		} catch (SQLException | IllegalArgumentException e) {
-			throw new IllegalArgumentException("writeRecord() failed", e);
-		}
-	}
-	
-	/**
-	 * Executes prepared statement and closes all resources of this instance.
-	 *
-	 * @throws IOException Thrown, if the input could not be closed properly.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			if (upload != null) {
-				upload.executeBatch();
-				upload.close();
-			}
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} finally {
-			upload = null;
-			batchCount = 0;
-		}
-		
-		try {
-			if (dbConn != null) {
-				dbConn.close();
-			}
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} finally {
-			dbConn = null;
-		}
-	}
-	
-	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
-		return new JDBCOutputFormatBuilder();
-	}
-	
-	public static class JDBCOutputFormatBuilder {
-		private final JDBCOutputFormat format;
-		
-		protected JDBCOutputFormatBuilder() {
-			this.format = new JDBCOutputFormat();
-		}
-		
-		public JDBCOutputFormatBuilder setUsername(String username) {
-			format.username = username;
-			return this;
-		}
-		
-		public JDBCOutputFormatBuilder setPassword(String password) {
-			format.password = password;
-			return this;
-		}
-		
-		public JDBCOutputFormatBuilder setDrivername(String drivername) {
-			format.drivername = drivername;
-			return this;
-		}
-		
-		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
-			format.dbURL = dbURL;
-			return this;
-		}
-		
-		public JDBCOutputFormatBuilder setQuery(String query) {
-			format.query = query;
-			return this;
-		}
-		
-		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
-			format.batchInterval = batchInterval;
-			return this;
-		}
-		
-		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
-			format.typesArray = typesArray;
-			return this;
-		}
-		
-		/**
-		 * Finalizes the configuration and checks validity.
-		 * 
-		 * @return Configured JDBCOutputFormat
-		 */
-		public JDBCOutputFormat finish() {
-			if (format.username == null) {
-				LOG.info("Username was not supplied separately.");
-			}
-			if (format.password == null) {
-				LOG.info("Password was not supplied separately.");
-			}
-			if (format.dbURL == null) {
-				throw new IllegalArgumentException("No dababase URL supplied.");
-			}
-			if (format.query == null) {
-				throw new IllegalArgumentException("No query suplied");
-			}
-			if (format.drivername == null) {
-				throw new IllegalArgumentException("No driver supplied");
-			}
-			
-			return format;
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
deleted file mode 100644
index 2ed2f8c..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
-/** 
- * 
- * This splits generator actually does nothing but wrapping the query parameters
- * computed by the user before creating the {@link JDBCInputFormat} instance.
- * 
- * */
-public class GenericParameterValuesProvider implements ParameterValuesProvider {
-
-	private final Serializable[][] parameters;
-	
-	public GenericParameterValuesProvider(Serializable[][] parameters) {
-		this.parameters = parameters;
-	}
-
-	@Override
-	public Serializable[][] getParameterValues(){
-		//do nothing...precomputed externally
-		return parameters;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
deleted file mode 100644
index ac56b98..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-/** 
- * 
- * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
- * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
- * ranging from the min value up to the max.
- * 
- * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
- * <PRE>
- *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
- * </PRE>
- *
- * you can use this class to automatically generate the parameters of the BETWEEN clause,
- * based on the passed constructor parameters.
- * 
- * */
-public class NumericBetweenParametersProvider implements ParameterValuesProvider {
-
-	private long fetchSize;
-	private final long min;
-	private final long max;
-	
-	public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
-		this.fetchSize = fetchSize;
-		this.min = min;
-		this.max = max;
-	}
-
-	@Override
-	public Serializable[][] getParameterValues(){
-		double maxElemCount = (max - min) + 1;
-		int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
-		Serializable[][] parameters = new Serializable[size][2];
-		int count = 0;
-		for (long i = min; i < max; i += fetchSize, count++) {
-			long currentLimit = i + fetchSize - 1;
-			parameters[count] = new Long[]{i,currentLimit};
-			if (currentLimit + 1 + fetchSize > max) {
-				parameters[count + 1] = new Long[]{currentLimit + 1, max};
-				break;
-			}
-		}
-		return parameters;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
deleted file mode 100644
index c194497..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
-/**
- * 
- * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
- * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
- * 
- * */
-public interface ParameterValuesProvider {
-
-	/** Returns the necessary parameters array to use for query in parallel a table */
-	public Serializable[][] getParameterValues();
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
deleted file mode 100644
index da9469b..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
-import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.table.Row;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCFullTest extends JDBCTestBase {
-
-	@Test
-	public void testJdbcInOut() throws Exception {
-		//run without parallelism
-		runTest(false);
-
-		//cleanup
-		JDBCTestBase.tearDownClass();
-		JDBCTestBase.prepareTestDb();
-		
-		//run expliting parallelism
-		runTest(true);
-		
-	}
-
-	private void runTest(boolean exploitParallelism) {
-		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
-		JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(JDBCTestBase.DRIVER_CLASS)
-				.setDBUrl(JDBCTestBase.DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo);
-
-		if(exploitParallelism) {
-			final int fetchSize = 1;
-			final Long min = new Long(JDBCTestBase.testData[0][0].toString());
-			final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString());
-			//use a "splittable" query to exploit parallelism
-			inputBuilder = inputBuilder
-					.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-					.setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
-		}
-		DataSet<Row> source = environment.createInput(inputBuilder.finish());
-
-		//NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but
-		//some database, doens't handle correctly null values when no column type specified
-		//in PreparedStatement.setObject (see its javadoc for more details)
-		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(JDBCTestBase.DRIVER_CLASS)
-				.setDBUrl(JDBCTestBase.DB_URL)
-				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
-				.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
-				.finish());
-		try {
-			environment.execute();
-		} catch (Exception e) {
-			Assert.fail("JDBC full test failed. " + e.getMessage());
-		}
-
-		try (
-			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
-			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
-			ResultSet resultSet = statement.executeQuery()
-		) {
-			int count = 0;
-			while (resultSet.next()) {
-				count++;
-			}
-			Assert.assertEquals(JDBCTestBase.testData.length, count);
-		} catch (SQLException e) {
-			Assert.fail("JDBC full test failed. " + e.getMessage());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index efae076..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.ResultSet;
-
-import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
-import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.core.io.InputSplit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCInputFormatTest extends JDBCTestBase {
-
-	private JDBCInputFormat jdbcInputFormat;
-
-	@After
-	public void tearDown() throws IOException {
-		if (jdbcInputFormat != null) {
-			jdbcInputFormat.close();
-		}
-		jdbcInputFormat = null;
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUntypedRowInfo() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl(DB_URL)
-				.setQuery(SELECT_ALL_BOOKS)
-				.finish();
-		jdbcInputFormat.openInputFormat();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidDriver() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl(DB_URL)
-				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
-				.finish();
-		jdbcInputFormat.openInputFormat();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidURL() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
-				.finish();
-		jdbcInputFormat.openInputFormat();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidQuery() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery("iamnotsql")
-				.setRowTypeInfo(rowTypeInfo)
-				.finish();
-		jdbcInputFormat.openInputFormat();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompleteConfiguration() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
-				.finish();
-	}
-
-	@Test
-	public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(SELECT_ALL_BOOKS)
-				.setRowTypeInfo(rowTypeInfo)
-				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-				.finish();
-		//this query does not exploit parallelism
-		Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length);
-		jdbcInputFormat.openInputFormat();
-		jdbcInputFormat.open(null);
-		Row row =  new Row(5);
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			Row next = jdbcInputFormat.nextRecord(row);
-			if (next == null) {
-				break;
-			}
-			
-			if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
-			if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
-			if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
-			if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
-			if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
-
-			for (int x = 0; x < 5; x++) {
-				if(testData[recordCount][x]!=null) {
-					Assert.assertEquals(testData[recordCount][x], next.productElement(x));
-				}
-			}
-			recordCount++;
-		}
-		jdbcInputFormat.close();
-		jdbcInputFormat.closeInputFormat();
-		Assert.assertEquals(testData.length, recordCount);
-	}
-	
-	@Test
-	public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
-		final int fetchSize = 1;
-		final Long min = new Long(JDBCTestBase.testData[0][0] + "");
-		final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
-		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-				.setRowTypeInfo(rowTypeInfo)
-				.setParametersProvider(pramProvider)
-				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-				.finish();
-
-		jdbcInputFormat.openInputFormat();
-		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
-		//this query exploit parallelism (1 split for every id)
-		Assert.assertEquals(testData.length, splits.length);
-		int recordCount = 0;
-		Row row =  new Row(5);
-		for (int i = 0; i < splits.length; i++) {
-			jdbcInputFormat.open(splits[i]);
-			while (!jdbcInputFormat.reachedEnd()) {
-				Row next = jdbcInputFormat.nextRecord(row);
-				if (next == null) {
-					break;
-				}
-				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
-				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
-				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
-				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
-				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
-
-				for (int x = 0; x < 5; x++) {
-					if(testData[recordCount][x]!=null) {
-						Assert.assertEquals(testData[recordCount][x], next.productElement(x));
-					}
-				}
-				recordCount++;
-			}
-			jdbcInputFormat.close();
-		}
-		jdbcInputFormat.closeInputFormat();
-		Assert.assertEquals(testData.length, recordCount);
-	}
-	
-	@Test
-	public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException {
-		Serializable[][] queryParameters = new String[2][1];
-		queryParameters[0] = new String[]{"Kumar"};
-		queryParameters[1] = new String[]{"Tan Ah Teck"};
-		ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
-				.setRowTypeInfo(rowTypeInfo)
-				.setParametersProvider(paramProvider)
-				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-				.finish();
-		jdbcInputFormat.openInputFormat();
-		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
-		//this query exploit parallelism (1 split for every queryParameters row)
-		Assert.assertEquals(queryParameters.length, splits.length);
-		int recordCount = 0;
-		Row row =  new Row(5);
-		for (int i = 0; i < splits.length; i++) {
-			jdbcInputFormat.open(splits[i]);
-			while (!jdbcInputFormat.reachedEnd()) {
-				Row next = jdbcInputFormat.nextRecord(row);
-				if (next == null) {
-					break;
-				}
-				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
-				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
-				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
-				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
-				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
-
-				recordCount++;
-			}
-			jdbcInputFormat.close();
-		}
-		Assert.assertEquals(3, recordCount);
-		jdbcInputFormat.closeInputFormat();
-	}
-	
-	@Test
-	public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(SELECT_EMPTY)
-				.setRowTypeInfo(rowTypeInfo)
-				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-				.finish();
-		jdbcInputFormat.openInputFormat();
-		jdbcInputFormat.open(null);
-		Row row = new Row(5);
-		int recordsCnt = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			Assert.assertNull(jdbcInputFormat.nextRecord(row));
-			recordsCnt++;
-		}
-		jdbcInputFormat.close();
-		jdbcInputFormat.closeInputFormat();
-		Assert.assertEquals(0, recordsCnt);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index 086a84c..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest extends JDBCTestBase {
-
-	private JDBCOutputFormat jdbcOutputFormat;
-	private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>();
-
-	@After
-	public void tearDown() throws IOException {
-		if (jdbcOutputFormat != null) {
-			jdbcOutputFormat.close();
-		}
-		jdbcOutputFormat = null;
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidDriver() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl(DB_URL)
-				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidURL() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidQuery() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery("iamnotsql")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompleteConfiguration() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
-				.finish();
-	}
-
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompatibleTypes() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-
-		tuple5.setField(4, 0);
-		tuple5.setField("hello", 1);
-		tuple5.setField("world", 2);
-		tuple5.setField(0.99, 3);
-		tuple5.setField("imthewrongtype", 4);
-
-		Row row = new Row(tuple5.getArity());
-		for (int i = 0; i < tuple5.getArity(); i++) {
-			row.setField(i, tuple5.getField(i));
-		}
-		jdbcOutputFormat.writeRecord(row);
-		jdbcOutputFormat.close();
-	}
-
-	@Test
-	public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
-
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-
-		for (int i = 0; i < testData.length; i++) {
-			Row row = new Row(testData[i].length);
-			for (int j = 0; j < testData[i].length; j++) {
-				row.setField(j, testData[i][j]);
-			}
-			jdbcOutputFormat.writeRecord(row);
-		}
-
-		jdbcOutputFormat.close();
-
-		try (
-			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
-			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
-			ResultSet resultSet = statement.executeQuery()
-		) {
-			int recordCount = 0;
-			while (resultSet.next()) {
-				Row row = new Row(tuple5.getArity());
-				for (int i = 0; i < tuple5.getArity(); i++) {
-					row.setField(i, resultSet.getObject(i + 1));
-				}
-				if (row.productElement(0) != null) {
-					Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
-				}
-				if (row.productElement(1) != null) {
-					Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
-				}
-				if (row.productElement(2) != null) {
-					Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
-				}
-				if (row.productElement(3) != null) {
-					Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
-				}
-				if (row.productElement(4) != null) {
-					Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
-				}
-
-				for (int x = 0; x < tuple5.getArity(); x++) {
-					if (JDBCTestBase.testData[recordCount][x] != null) {
-						Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
-					}
-				}
-
-				recordCount++;
-			}
-			Assert.assertEquals(JDBCTestBase.testData.length, recordCount);
-		} catch (SQLException e) {
-			Assert.fail("JDBC OutputFormat test failed. " + e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
deleted file mode 100644
index 69ad693..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-/**
- * Base test class for JDBC Input and Output formats
- */
-public class JDBCTestBase {
-	
-	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
-	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
-	public static final String INPUT_TABLE = "books";
-	public static final String OUTPUT_TABLE = "newbooks";
-	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
-	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
-	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
-	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
-	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
-	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
-	
-	protected static Connection conn;
-
-	public static final Object[][] testData = {
-			{1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11},
-			{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-			{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
-			{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-			{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55},
-			{1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66},
-			{1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77},
-			{1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88},
-			{1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99},
-			{1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}};
-
-	public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
-		BasicTypeInfo.INT_TYPE_INFO,
-		BasicTypeInfo.STRING_TYPE_INFO,
-		BasicTypeInfo.STRING_TYPE_INFO,
-		BasicTypeInfo.DOUBLE_TYPE_INFO,
-		BasicTypeInfo.INT_TYPE_INFO
-	};
-	
-	public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
-
-	public static String getCreateQuery(String tableName) {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
-		sqlQueryBuilder.append(tableName).append(" (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-		return sqlQueryBuilder.toString();
-	}
-	
-	public static String getInsertQuery() {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		for (int i = 0; i < JDBCTestBase.testData.length; i++) {
-			sqlQueryBuilder.append("(")
-			.append(JDBCTestBase.testData[i][0]).append(",'")
-			.append(JDBCTestBase.testData[i][1]).append("','")
-			.append(JDBCTestBase.testData[i][2]).append("',")
-			.append(JDBCTestBase.testData[i][3]).append(",")
-			.append(JDBCTestBase.testData[i][4]).append(")");
-			if (i < JDBCTestBase.testData.length - 1) {
-				sqlQueryBuilder.append(",");
-			}
-		}
-		String insertQuery = sqlQueryBuilder.toString();
-		return insertQuery;
-	}
-	
-	public static final OutputStream DEV_NULL = new OutputStream() {
-		@Override
-		public void write(int b) {
-		}
-	};
-
-	public static void prepareTestDb() throws Exception {
-		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-		Class.forName(DRIVER_CLASS);
-		Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
-
-		//create input table
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(getCreateQuery(INPUT_TABLE));
-		stat.close();
-
-		//create output table
-		stat = conn.createStatement();
-		stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
-		stat.close();
-
-		//prepare input data
-		stat = conn.createStatement();
-		stat.execute(JDBCTestBase.getInsertQuery());
-		stat.close();
-
-		conn.close();
-	}
-
-	@BeforeClass
-	public static void setUpClass() throws SQLException {
-		try {
-			System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-			prepareDerbyDatabase();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
-		Class.forName(DRIVER_CLASS);
-		conn = DriverManager.getConnection(DB_URL + ";create=true");
-		createTable(INPUT_TABLE);
-		createTable(OUTPUT_TABLE);
-		insertDataIntoInputTable();
-		conn.close();
-	}
-	
-	private static void createTable(String tableName) throws SQLException {
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(getCreateQuery(tableName));
-		stat.close();
-	}
-	
-	private static void insertDataIntoInputTable() throws SQLException {
-		Statement stat = conn.createStatement();
-		stat.execute(JDBCTestBase.getInsertQuery());
-		stat.close();
-	}
-
-	@AfterClass
-	public static void tearDownClass() {
-		cleanUpDerbyDatabases();
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		try {
-			Class.forName(DRIVER_CLASS);
-			conn = DriverManager.getConnection(DB_URL + ";create=true");
-			Statement stat = conn.createStatement();
-			stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
-			stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
-			stat.close();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/pom.xml b/flink-batch-connectors/pom.xml
deleted file mode 100644
index d4f65b3..0000000
--- a/flink-batch-connectors/pom.xml
+++ /dev/null
@@ -1,45 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-
-	<artifactId>flink-batch-connectors</artifactId>
-	<name>flink-batch-connectors</name>
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-avro</module>
-		<module>flink-jdbc</module>
-		<module>flink-hadoop-compatibility</module>
-		<module>flink-hbase</module>
-		<module>flink-hcatalog</module>
-	</modules>
-	
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
new file mode 100644
index 0000000..cdd7c78
--- /dev/null
+++ b/flink-connectors/flink-avro/pom.xml
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-avro_2.10</artifactId>
+	<name>flink-avro</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<!-- version is derived from base module -->
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>create-test-dependency</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>maven</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
+			classpath when running the tests to actually test whether the user code class loader
+			is properly used.-->
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>remove-avroexternalprogram</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/testjar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<!-- Generate Test class from avro schema -->
+			<plugin>
+				<groupId>org.apache.avro</groupId>
+				<artifactId>avro-maven-plugin</artifactId>
+				<version>1.7.7</version>
+				<executions>
+					<execution>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>schema</goal>
+						</goals>
+						<configuration>
+							<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+							<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-assembly-plugin</artifactId>
+										<versionRange>[2.4,)</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-clean-plugin</artifactId>
+										<versionRange>[1,)</versionRange>
+										<goals>
+											<goal>clean</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.avro</groupId>
+										<artifactId>avro-maven-plugin</artifactId>
+										<versionRange>[1.7.7,)</versionRange>
+										<goals>
+											<goal>schema</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
new file mode 100644
index 0000000..59da4cb
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+
+public class DataInputDecoder extends Decoder {
+	
+	private final Utf8 stringDecoder = new Utf8();
+	
+	private DataInput in;
+	
+	public void setIn(DataInput in) {
+		this.in = in;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// primitives
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void readNull() {}
+	
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		return in.readBoolean();
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		return in.readInt();
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		return in.readLong();
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return in.readFloat();
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return in.readDouble();
+	}
+	
+	@Override
+	public int readEnum() throws IOException {
+		return readInt();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// bytes
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void readFixed(byte[] bytes, int start, int length) throws IOException {
+		in.readFully(bytes, start, length);
+	}
+	
+	@Override
+	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+		int length = readInt();
+		ByteBuffer result;
+		if (old != null && length <= old.capacity() && old.hasArray()) {
+			result = old;
+			result.clear();
+		} else {
+			result = ByteBuffer.allocate(length);
+		}
+		in.readFully(result.array(), result.arrayOffset() + result.position(), length);
+		result.limit(length);
+		return result;
+	}
+	
+	
+	@Override
+	public void skipFixed(int length) throws IOException {
+		skipBytes(length);
+	}
+	
+	@Override
+	public void skipBytes() throws IOException {
+		int num = readInt();
+		skipBytes(num);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// strings
+	// --------------------------------------------------------------------------------------------
+	
+	
+	@Override
+	public Utf8 readString(Utf8 old) throws IOException {
+		int length = readInt();
+		Utf8 result = (old != null ? old : new Utf8());
+		result.setByteLength(length);
+		
+		if (length > 0) {
+			in.readFully(result.getBytes(), 0, length);
+		}
+		
+		return result;
+	}
+
+	@Override
+	public String readString() throws IOException {
+		return readString(stringDecoder).toString();
+	}
+
+	@Override
+	public void skipString() throws IOException {
+		int len = readInt();
+		skipBytes(len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// collection types
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public long readArrayStart() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long arrayNext() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long skipArray() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long readMapStart() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long mapNext() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long skipMap() throws IOException {
+		return readVarLongCount(in);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// union
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int readIndex() throws IOException {
+		return readInt();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// utils
+	// --------------------------------------------------------------------------------------------
+	
+	private void skipBytes(int num) throws IOException {
+		while (num > 0) {
+			num -= in.skipBytes(num);
+		}
+	}
+	
+	public static long readVarLongCount(DataInput in) throws IOException {
+		long value = in.readUnsignedByte();
+
+		if ((value & 0x80) == 0) {
+			return value;
+		}
+		else {
+			long curr;
+			int shift = 7;
+			value = value & 0x7f;
+			while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+				value |= (curr & 0x7f) << shift;
+				shift += 7;
+			}
+			value |= curr << shift;
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
new file mode 100644
index 0000000..0102cc1
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+
+public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private DataOutput out;
+	
+	
+	public void setOut(DataOutput out) {
+		this.out = out;
+	}
+
+
+	@Override
+	public void flush() throws IOException {}
+
+	// --------------------------------------------------------------------------------------------
+	// primitives
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void writeNull() {}
+	
+
+	@Override
+	public void writeBoolean(boolean b) throws IOException {
+		out.writeBoolean(b);
+	}
+
+	@Override
+	public void writeInt(int n) throws IOException {
+		out.writeInt(n);
+	}
+
+	@Override
+	public void writeLong(long n) throws IOException {
+		out.writeLong(n);
+	}
+
+	@Override
+	public void writeFloat(float f) throws IOException {
+		out.writeFloat(f);
+	}
+
+	@Override
+	public void writeDouble(double d) throws IOException {
+		out.writeDouble(d);
+	}
+	
+	@Override
+	public void writeEnum(int e) throws IOException {
+		out.writeInt(e);
+	}
+	
+	
+	// --------------------------------------------------------------------------------------------
+	// bytes
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+		out.write(bytes, start, len);
+	}
+	
+	@Override
+	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+		out.writeInt(len);
+		if (len > 0) {
+			out.write(bytes, start, len);
+		}
+	}
+	
+	@Override
+	public void writeBytes(ByteBuffer bytes) throws IOException {
+		int num = bytes.remaining();
+		out.writeInt(num);
+		
+		if (num > 0) {
+			writeFixed(bytes);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// strings
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeString(String str) throws IOException {
+		byte[] bytes = Utf8.getBytesFor(str);
+		writeBytes(bytes, 0, bytes.length);
+	}
+	
+	@Override
+	public void writeString(Utf8 utf8) throws IOException {
+		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+		
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// collection types
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeArrayStart() {}
+
+	@Override
+	public void setItemCount(long itemCount) throws IOException {
+		if (itemCount > 0) {
+			writeVarLongCount(out, itemCount);
+		}
+	}
+
+	@Override
+	public void startItem() {}
+
+	@Override
+	public void writeArrayEnd() throws IOException {
+		// write a single byte 0, shortcut for a var-length long of 0
+		out.write(0);
+	}
+
+	@Override
+	public void writeMapStart() {}
+
+	@Override
+	public void writeMapEnd() throws IOException {
+		// write a single byte 0, shortcut for a var-length long of 0
+		out.write(0);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// union
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void writeIndex(int unionIndex) throws IOException {
+		out.writeInt(unionIndex);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// utils
+	// --------------------------------------------------------------------------------------------
+		
+	
+	public static void writeVarLongCount(DataOutput out, long val) throws IOException {
+		if (val < 0) {
+			throw new IOException("Illegal count (must be non-negative): " + val);
+		}
+		
+		while ((val & ~0x7FL) != 0) {
+			out.write(((int) val) | 0x80);
+			val >>>= 7;
+		}
+		out.write((int) val);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..709c4f1
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.avro.file.SeekableInput;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
+ * 
+ * The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+	private final FSDataInputStream stream;
+	private long pos;
+	private long len;
+
+	public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+		this.stream = stream;
+		this.pos = 0;
+		this.len = len;
+	}
+
+	public long length() throws IOException {
+		return this.len;
+	}
+
+	public int read(byte[] b, int off, int len) throws IOException {
+		int read;
+		read = stream.read(b, off, len);
+		pos += read;
+		return read;
+	}
+
+	public void seek(long p) throws IOException {
+		stream.seek(p);
+		pos = p;
+	}
+
+	public long tell() throws IOException {
+		return pos;
+	}
+
+	public void close() throws IOException {
+		stream.close();
+	}
+}