You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/04 20:59:59 UTC

[1/3] flink git commit: [FLINK-2005] Remove Record API from jdbc module

Repository: flink
Updated Branches:
  refs/heads/master d570d078a -> 30761572b


[FLINK-2005] Remove Record API from jdbc module

This closes #982


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06b37bf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06b37bf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06b37bf5

Branch: refs/heads/master
Commit: 06b37bf550315bd1d5be7dc3ed6638fd21768e1a
Parents: d570d07
Author: zentol <s....@web.de>
Authored: Tue Aug 4 12:45:22 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:13:30 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-jdbc/pom.xml                |  12 -
 .../java/record/io/jdbc/JDBCInputFormat.java    | 389 -------------------
 .../java/record/io/jdbc/JDBCOutputFormat.java   | 359 -----------------
 .../record/io/jdbc/example/JDBCExample.java     | 136 -------
 .../java/record/io/jdbc/DevNullLogStream.java   |  30 --
 .../record/io/jdbc/JDBCInputFormatTest.java     | 214 ----------
 .../record/io/jdbc/JDBCOutputFormatTest.java    | 225 -----------
 7 files changed, 1365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml
index 7b499a7..a3976c1 100644
--- a/flink-staging/flink-jdbc/pom.xml
+++ b/flink-staging/flink-jdbc/pom.xml
@@ -41,18 +41,6 @@ under the License.
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-				
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
 
 		<dependency>
 			<groupId>org.apache.derby</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index 3cd295b..0000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * InputFormat to read data from a database and generate PactReords.
- * The InputFormat has to be configured with the query, and either all
- * connection parameters or a complete database URL.{@link Configuration} The position of a value inside a Record is
- * determined by the table
- * returned.
- * 
- * @see Configuration
- * @see Record
- * @see DriverManager
- */
-public class JDBCInputFormat extends GenericInputFormat implements NonParallelInput {
-
-	private static final long serialVersionUID = 1L;
-	
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
-	
-
-	public final String DRIVER_KEY = "driver";
-	public final String USERNAME_KEY = "username";
-	public final String PASSWORD_KEY = "password";
-	public final String URL_KEY = "url";
-	public final String QUERY_KEY = "query";
-
-
-	private String username;
-	private String password;
-	private String driverName;
-	private String dbURL;
-	private String query;
-
-	
-	private transient Connection dbConn;
-	private transient Statement statement;
-	private transient ResultSet resultSet;
-
-
-	/**
-	 * Creates a non-configured JDBCInputFormat. This format has to be
-	 * configured using configure(configuration).
-	 */
-	public JDBCInputFormat() {}
-
-	/**
-	 * Creates a JDBCInputFormat and configures it.
-	 * 
-	 * @param driverName
-	 *        JDBC-Drivename
-	 * @param dbURL
-	 *        Formatted URL containing all connection parameters.
-	 * @param username
-	 * @param password
-	 * @param query
-	 *        Query to execute.
-	 */
-	public JDBCInputFormat(String driverName, String dbURL, String username, String password, String query) {
-		this.driverName = driverName;
-		this.query = query;
-		this.dbURL = dbURL;
-		this.username = username;
-		this.password = password;
-	}
-
-	/**
-	 * Creates a JDBCInputFormat and configures it.
-	 * 
-	 * @param driverName
-	 *        JDBC-Drivername
-	 * @param dbURL
-	 *        Formatted URL containing all connection parameters.
-	 * @param query
-	 *        Query to execute.
-	 */
-	public JDBCInputFormat(String driverName, String dbURL, String query) {
-		this(driverName, dbURL, "", "", query);
-	}
-
-	/**
-	 * Creates a JDBCInputFormat and configures it.
-	 * 
-	 * @param parameters
-	 *        Configuration with all connection parameters.
-	 * @param query
-	 *        Query to execute.
-	 */
-	public JDBCInputFormat(Configuration parameters, String query) {
-		this.driverName = parameters.getString(DRIVER_KEY, "");
-		this.username = parameters.getString(USERNAME_KEY, "");
-		this.password = parameters.getString(PASSWORD_KEY, "");
-		this.dbURL = parameters.getString(URL_KEY, "");
-		this.query = query;
-	}
-
-	
-	/**
-	 * Configures this JDBCInputFormat. This includes setting the connection
-	 * parameters (if necessary), establishing the connection and executing the
-	 * query.
-	 * 
-	 * @param parameters
-	 *        Configuration containing all or no parameters.
-	 */
-	@Override
-	public void configure(Configuration parameters) {
-		boolean needConfigure = isFieldNullOrEmpty(this.query) || isFieldNullOrEmpty(this.dbURL);
-		if (needConfigure) {
-			this.driverName = parameters.getString(DRIVER_KEY, null);
-			this.username = parameters.getString(USERNAME_KEY, null);
-			this.password = parameters.getString(PASSWORD_KEY, null);
-			this.query = parameters.getString(QUERY_KEY, null);
-			this.dbURL = parameters.getString(URL_KEY, null);
-		}
-
-		try {
-			prepareQueryExecution();
-		} catch (SQLException e) {
-			throw new IllegalArgumentException("Configure failed:\t!", e);
-		}
-	}
-
-	/**
-	 * Enters data value from the current resultSet into a Record.
-	 * 
-	 * @param pos
-	 *        Record position to be set.
-	 * @param type
-	 *        SQL type of the resultSet value.
-	 * @param record
-	 *        Target Record.
-	 */
-	private void retrieveTypeAndFillRecord(int pos, int type, Record record) throws SQLException,
-			NotTransformableSQLFieldException {
-		switch (type) {
-		case java.sql.Types.NULL:
-			record.setField(pos, NullValue.getInstance());
-			break;
-		case java.sql.Types.BOOLEAN:
-			record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
-			break;
-		case java.sql.Types.BIT:
-			record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
-			break;
-		case java.sql.Types.CHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.NCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.VARCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.LONGVARCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.LONGNVARCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.TINYINT:
-			record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
-			break;
-		case java.sql.Types.SMALLINT:
-			record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
-			break;
-		case java.sql.Types.BIGINT:
-			record.setField(pos, new LongValue(resultSet.getLong(pos + 1)));
-			break;
-		case java.sql.Types.INTEGER:
-			record.setField(pos, new IntValue(resultSet.getInt(pos + 1)));
-			break;
-		case java.sql.Types.FLOAT:
-			record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
-			break;
-		case java.sql.Types.REAL:
-			record.setField(pos, new FloatValue(resultSet.getFloat(pos + 1)));
-			break;
-		case java.sql.Types.DOUBLE:
-			record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
-			break;
-		case java.sql.Types.DECIMAL:
-			record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
-			break;
-		case java.sql.Types.NUMERIC:
-			record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
-			break;
-		case java.sql.Types.DATE:
-			record.setField(pos, new StringValue(resultSet.getDate(pos + 1).toString()));
-			break;
-		case java.sql.Types.TIME:
-			record.setField(pos, new LongValue(resultSet.getTime(pos + 1).getTime()));
-			break;
-		case java.sql.Types.TIMESTAMP:
-			record.setField(pos, new StringValue(resultSet.getTimestamp(pos + 1).toString()));
-			break;
-		case java.sql.Types.SQLXML:
-			record.setField(pos, new StringValue(resultSet.getSQLXML(pos + 1).toString()));
-			break;
-		default:
-			throw new NotTransformableSQLFieldException("Unknown sql-type [" + type + "]on column [" + pos + "]");
-
-			// case java.sql.Types.BINARY:
-			// case java.sql.Types.VARBINARY:
-			// case java.sql.Types.LONGVARBINARY:
-			// case java.sql.Types.ARRAY:
-			// case java.sql.Types.JAVA_OBJECT:
-			// case java.sql.Types.BLOB:
-			// case java.sql.Types.CLOB:
-			// case java.sql.Types.NCLOB:
-			// case java.sql.Types.DATALINK:
-			// case java.sql.Types.DISTINCT:
-			// case java.sql.Types.OTHER:
-			// case java.sql.Types.REF:
-			// case java.sql.Types.ROWID:
-			// case java.sql.Types.STRUCT:
-		}
-	}
-
-	private boolean isFieldNullOrEmpty(String field) {
-		return (field == null || field.length() == 0);
-	}
-
-	private void prepareQueryExecution() throws SQLException {
-		setClassForDBType();
-		prepareCredentialsAndExecute();
-	}
-
-	/**
-	 * Loads appropriate JDBC driver.
-	 * 
-	 * @param dbType
-	 *        Type of the database.
-	 * @return boolean value, indication whether an appropriate driver could be
-	 *         found.
-	 */
-	private void setClassForDBType() {
-		try {
-			Class.forName(driverName);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC-Class not found:\t" + cnfe.getLocalizedMessage());
-		}
-	}
-
-	private void prepareCredentialsAndExecute() throws SQLException {
-		if (isFieldNullOrEmpty(username)) {
-			prepareConnection(dbURL);
-		} else {
-			prepareConnection();
-		}
-		executeQuery();
-	}
-
-	/**
-	 * Establishes a connection to a database.
-	 * 
-	 * @param dbURL
-	 *        Assembled URL containing all connection parameters.
-	 * @return boolean value, indicating whether a connection could be
-	 *         established
-	 */
-	private void prepareConnection(String dbURL) throws SQLException {
-		dbConn = DriverManager.getConnection(dbURL);
-	}
-
-	/**
-	 * Assembles the Database URL and establishes a connection.
-	 * 
-	 * @param dbType
-	 *        Type of the database.
-	 * @param username
-	 *        Login username.
-	 * @param password
-	 *        Login password.
-	 * @return boolean value, indicating whether a connection could be
-	 *         established
-	 */
-	private void prepareConnection() throws SQLException {
-		dbConn = DriverManager.getConnection(dbURL, username, password);
-	}
-
-	private void executeQuery() throws SQLException {
-		statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
-		resultSet = statement.executeQuery(this.query);
-	}
-
-	/**
-	 * Checks whether all data has been read.
-	 * 
-	 * @return boolean value indication whether all data has been read.
-	 */
-	@Override
-	public boolean reachedEnd() {
-		try {
-			if (resultSet.isLast()) {
-				resultSet.close();
-				statement.close();
-				dbConn.close();
-				return true;
-			} else {
-				return false;
-			}
-		} catch (SQLException e) {
-			throw new IllegalArgumentException("Couldn't evaluate reachedEnd():\t" + e.getMessage());
-		} catch (NullPointerException e) {
-			throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
-		}
-	}
-
-	/**
-	 * Stores the next resultSet row in a Record
-	 * 
-	 * @param record
-	 *        target Record
-	 * @return boolean value indicating that the operation was successful
-	 */
-	@Override
-	public Record nextRecord(Record record) {
-		try {
-			resultSet.next();
-			ResultSetMetaData rsmd = resultSet.getMetaData();
-			int column_count = rsmd.getColumnCount();
-			record.setNumFields(column_count);
-
-			for (int pos = 0; pos < column_count; pos++) {
-				int type = rsmd.getColumnType(pos + 1);
-				retrieveTypeAndFillRecord(pos, type, record);
-			}
-			return record;
-		} catch (SQLException e) {
-			throw new IllegalArgumentException("Couldn't read data:\t" + e.getMessage());
-		} catch (NotTransformableSQLFieldException e) {
-			throw new IllegalArgumentException("Couldn't read data because of unknown column sql-type:\t"
-				+ e.getMessage());
-		} catch (NullPointerException e) {
-			throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
-		}
-	}
-	
-	public static class NotTransformableSQLFieldException extends Exception {
-
-		private static final long serialVersionUID = 1L;
-
-		public NotTransformableSQLFieldException(String message) {
-			super(message);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index 780001a..0000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-public class JDBCOutputFormat implements OutputFormat<Record> {
-	private static final long serialVersionUID = 1L;
-
-	private static final int DEFAULT_BATCH_INTERVERAL = 5000;
-	
-	public static final String DRIVER_KEY = "driver";
-	public static final String USERNAME_KEY = "username";
-	public static final String PASSWORD_KEY = "password";
-	public static final String URL_KEY = "url";
-	public static final String QUERY_KEY = "query";
-	public static final String FIELD_COUNT_KEY = "fields";
-	public static final String FIELD_TYPE_KEY = "type";
-	public static final String BATCH_INTERVAL = "batchInt";
-
-	private Connection dbConn;
-	private PreparedStatement upload;
-
-	private String username;
-	private String password;
-	private String driverName;
-	private String dbURL;
-
-	private String query;
-	private int fieldCount;
-	private Class<? extends Value>[] fieldClasses;
-	
-	/**
-	 * Variable indicating the current number of insert sets in a batch.
-	 */
-	private int batchCount = 0;
-	
-	/**
-	 * Commit interval of batches.
-	 * High batch interval: faster inserts, more memory required (reduce if OutOfMemoryExceptions occur)
-	 * low batch interval: slower inserts, less memory.
-	 */
-	private int batchInterval = DEFAULT_BATCH_INTERVERAL;
-	
-
-	/**
-	 * Configures this JDBCOutputFormat.
-	 * 
-	 * @param parameters
-	 *        Configuration containing all parameters.
-	 */
-	@Override
-	public void configure(Configuration parameters) {
-		this.driverName = parameters.getString(DRIVER_KEY, null);
-		this.username = parameters.getString(USERNAME_KEY, null);
-		this.password = parameters.getString(PASSWORD_KEY, null);
-		this.dbURL = parameters.getString(URL_KEY, null);
-		this.query = parameters.getString(QUERY_KEY, null);
-		this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0);
-		this.batchInterval = parameters.getInteger(BATCH_INTERVAL, DEFAULT_BATCH_INTERVERAL);
-
-		@SuppressWarnings("unchecked")
-		Class<Value>[] classes = new Class[this.fieldCount];
-		this.fieldClasses = classes;
-		
-		ClassLoader cl = getClass().getClassLoader();
-
-		try {
-			for (int i = 0; i < this.fieldCount; i++) {
-				Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_KEY + i, null, cl);
-				if (clazz == null) {
-					throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
-							+ "No type class for parameter " + i);
-				}
-				this.fieldClasses[i] = clazz;
-			}
-		}
-		catch (ClassNotFoundException e) {
-			throw new RuntimeException("Could not load data type classes.", e);
-		}
-	}
-
-	/**
-	 * Connects to the target database and initializes the prepared statement.
-	 *
-	 * @param taskNumber The number of the parallel instance.
-	 * @throws IOException Thrown, if the output could not be opened due to an
-	 * I/O problem.
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		try {
-			establishConnection();
-			upload = dbConn.prepareStatement(query);
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("open() failed:\t!", sqe);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
-		}
-	}
-
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(driverName);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
-		}
-	}
-
-	/**
-	 * Adds a record to the prepared statement.
-	 * <p>
-	 * When this method is called, the output format is guaranteed to be opened.
-	 *
-	 * @param record The records to add to the output.
-	 * @throws IOException Thrown, if the records could not be added due to an
-	 * I/O problem.
-	 */
-	
-	@Override
-	public void writeRecord(Record record) throws IOException {
-		try {
-			for (int x = 0; x < record.getNumFields(); x++) {
-				Value temp = record.getField(x, fieldClasses[x]);
-				addValue(x + 1, temp);
-			}
-			upload.addBatch();
-			batchCount++;
-			if(batchCount >= batchInterval) {
-				upload.executeBatch();
-				batchCount = 0;
-			}
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("writeRecord() failed:\t", sqe);
-		} catch (IllegalArgumentException iae) {
-			throw new IllegalArgumentException("writeRecord() failed:\t", iae);
-		}
-	}
-
-	private enum pactType {
-		BooleanValue,
-		ByteValue,
-		CharValue,
-		DoubleValue,
-		FloatValue,
-		IntValue,
-		LongValue,
-		ShortValue,
-		StringValue
-	}
-
-	private void addValue(int index, Value value) throws SQLException {
-		pactType type;
-		try {
-			type = pactType.valueOf(value.getClass().getSimpleName());
-		} catch (IllegalArgumentException iae) {
-			throw new IllegalArgumentException("PactType not supported:\t", iae);
-		}
-		switch (type) {
-			case BooleanValue:
-				upload.setBoolean(index, ((BooleanValue) value).getValue());
-				break;
-			case ByteValue:
-				upload.setByte(index, ((ByteValue) value).getValue());
-				break;
-			case CharValue:
-				upload.setString(index, String.valueOf(((CharValue) value).getValue()));
-				break;
-			case DoubleValue:
-				upload.setDouble(index, ((DoubleValue) value).getValue());
-				break;
-			case FloatValue:
-				upload.setFloat(index, ((FloatValue) value).getValue());
-				break;
-			case IntValue:
-				upload.setInt(index, ((IntValue) value).getValue());
-				break;
-			case LongValue:
-				upload.setLong(index, ((LongValue) value).getValue());
-				break;
-			case ShortValue:
-				upload.setShort(index, ((ShortValue) value).getValue());
-				break;
-			case StringValue:
-				upload.setString(index, ((StringValue) value).getValue());
-				break;
-		}
-	}
-
-	/**
-	 * Executes prepared statement and closes all resources of this instance.
-	 *
-	 * @throws IOException Thrown, if the input could not be closed properly.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			upload.executeBatch();
-			batchCount = 0;
-			upload.close();
-			dbConn.close();
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("close() failed:\t", sqe);
-		}
-	}
-
-	/**
-	 * Creates a configuration builder that can be used to set the 
-	 * output format's parameters to the config in a fluent fashion.
-	 * 
-	 * @return A config builder for setting parameters.
-	 */
-	public static ConfigBuilder configureOutputFormat(GenericDataSink target) {
-		return new ConfigBuilder(target.getParameters());
-	}
-
-	/**
-	 * Abstract builder used to set parameters to the output format's 
-	 * configuration in a fluent way.
-	 */
-	protected static abstract class AbstractConfigBuilder<T>
-			extends FileOutputFormat.AbstractConfigBuilder<T> {
-
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param config The configuration into which the parameters will be written.
-		 */
-		protected AbstractConfigBuilder(Configuration config) {
-			super(config);
-		}
-
-		/**
-		 * Sets the query field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setQuery(String value) {
-			this.config.setString(QUERY_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the url field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setUrl(String value) {
-			this.config.setString(URL_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the username field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setUsername(String value) {
-			this.config.setString(USERNAME_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the password field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setPassword(String value) {
-			this.config.setString(PASSWORD_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the driver field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setDriver(String value) {
-			this.config.setString(DRIVER_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the type of a column.
-		 * Types are applied in the order they were set.
-		 * @param type PactType to apply.
-		 * @return The builder itself.
-		 */
-		public T setClass(Class<? extends Value> type) {
-			final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0);
-			this.config.setClass(FIELD_TYPE_KEY + numYet, type);
-			this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-	}
-
-	/**
-	 * A builder used to set parameters to the output format's configuration in a fluent way.
-	 */
-	public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param targetConfig The configuration into which the parameters will be written.
-		 */
-		protected ConfigBuilder(Configuration targetConfig) {
-			super(targetConfig);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 213fd6a..0000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc.example;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Stand-alone example for the JDBC connector.
- *
- * NOTE: To run this example, you need the apache derby code in your classpath.
- * See the Maven file (pom.xml) for a reference to the derby dependency. You can
- * simply Change the scope of the Maven dependency from test to compile.
- */
-public class JDBCExample implements Program, ProgramDescription {
-
-	@Override
-	public Plan getPlan(String[] args) {
-		/*
-		 * In this example we use the constructor where the url contains all the settings that are needed.
-		 * You could also use the default constructor and deliver a Configuration with all the needed settings.
-		 * You also could set the settings to the source-instance.
-		 */
-		GenericDataSource<JDBCInputFormat> source = new GenericDataSource<JDBCInputFormat>(
-				new JDBCInputFormat(
-						"org.apache.derby.jdbc.EmbeddedDriver",
-						"jdbc:derby:memory:ebookshop",
-						"select * from books"),
-				"Data Source");
-
-		GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output");
-		JDBCOutputFormat.configureOutputFormat(sink)
-				.setDriver("org.apache.derby.jdbc.EmbeddedDriver")
-				.setUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
-				.setClass(IntValue.class)
-				.setClass(StringValue.class)
-				.setClass(StringValue.class)
-				.setClass(FloatValue.class)
-				.setClass(IntValue.class);
-
-		sink.addInput(source);
-		return new Plan(sink, "JDBC Example Job");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameter:";
-	}
-
-	/*
-	 * To run this example, you need the apache derby code in your classpath!
-	 */
-	public static void main(String[] args) throws Exception {
-
-		prepareTestDb();
-		JDBCExample tut = new JDBCExample();
-		JobExecutionResult res = LocalExecutor.execute(tut, args);
-		System.out.println("runtime: " + res.getNetRuntime() + " ms");
-
-		System.exit(0);
-	}
-
-	private static void prepareTestDb() throws Exception {
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		Connection conn = DriverManager.getConnection(dbURL);
-
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-		
-		conn.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
deleted file mode 100644
index 172f585..0000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.OutputStream;
-
-public class DevNullLogStream {
-
-	public static final OutputStream DEV_NULL = new OutputStream() {
-		public void write(int b) {}
-	};
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 8e0a2c5..0000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
-	JDBCInputFormat jdbcInputFormat;
-	Configuration config;
-	static Connection conn;
-	static final Value[][] dbData = {
-		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
-		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			prepareDerbyDatabase();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyDatabase() throws ClassNotFoundException {
-		System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		createConnection(dbURL);
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		try {
-				String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-				Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-				conn = DriverManager.getConnection(dbURL);
-				Statement stat = conn.createStatement();
-				stat.executeUpdate("DROP TABLE books");
-				stat.close();
-				conn.close();
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail();
-			} 
-	}
-	
-	/*
-	 Loads JDBC derby driver ; creates(if necessary) and populates database.
-	 */
-	private static void createConnection(String dbURL) {
-		try {
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTable();
-			insertDataToSQLTables();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void createTable() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))");
-
-		stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-
-	@After
-	public void tearDown() {
-		jdbcInputFormat = null;
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidConnection() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;");
-		jdbcInputFormat.configure(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidQuery() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc");
-		jdbcInputFormat.configure(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidDBType() {
-		jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;");
-		jdbcInputFormat.configure(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUnsupportedSQLType() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent");
-		jdbcInputFormat.configure(null);
-		jdbcInputFormat.nextRecord(new Record());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testNotConfiguredFormatNext() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
-		jdbcInputFormat.nextRecord(new Record());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testNotConfiguredFormatEnd() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
-		jdbcInputFormat.reachedEnd();
-	}
-
-	@Test
-	public void testJDBCInputFormat() throws IOException {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
-		jdbcInputFormat.configure(null);
-		Record record = new Record();
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			Assert.assertEquals(5, record.getNumFields());
-			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
-			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
-			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
-			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
-			int[] pos = {0, 1, 2, 3, 4};
-			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
-			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-		
-		cleanUpDerbyDatabases();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c824ea1..0000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
-	private JDBCInputFormat jdbcInputFormat;
-	private JDBCOutputFormat jdbcOutputFormat;
-
-	private static Connection conn;
-
-	static final Value[][] dbData = {
-		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
-		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-			prepareDerbyInputDatabase();
-			prepareDerbyOutputDatabase();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		 try {
-			 String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			 conn = DriverManager.getConnection(dbURL);
-			 Statement stat = conn.createStatement();
-			 stat.executeUpdate("DROP TABLE books");
-			 stat.executeUpdate("DROP TABLE newbooks");
-			 stat.close();
-			 conn.close();
-		 } catch (Exception e) {
-			 e.printStackTrace();
-			 Assert.fail();
-		 } 
-	}
-	
-	private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTableBooks();
-			insertDataToSQLTables();
-			conn.close();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		} catch (SQLException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTableNewBooks();
-			conn.close();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		} catch (SQLException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void createTableBooks() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void createTableNewBooks() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-
-	@After
-	public void tearDown() {
-		jdbcOutputFormat = null;
-		cleanUpDerbyDatabases();
-	}
-
-	@Test
-	public void testJDBCOutputFormat() throws IOException {
-		String sourceTable = "books";
-		String targetTable = "newbooks";
-		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
-		String dbUrl = "jdbc:derby:memory:ebookshop";
-
-		Configuration cfg = new Configuration();
-		cfg.setString("driver", driverPath);
-		cfg.setString("url", dbUrl);
-		cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
-		cfg.setInteger("fields", 5);
-		cfg.setClass("type0", IntValue.class);
-		cfg.setClass("type1", StringValue.class);
-		cfg.setClass("type2", StringValue.class);
-		cfg.setClass("type3", FloatValue.class);
-		cfg.setClass("type4", IntValue.class);
-
-		jdbcOutputFormat = new JDBCOutputFormat();
-		jdbcOutputFormat.configure(cfg);
-		jdbcOutputFormat.open(0,1);
-
-		jdbcInputFormat = new JDBCInputFormat(
-				driverPath,
-				dbUrl,
-				"select * from " + sourceTable);
-		jdbcInputFormat.configure(null);
-
-		Record record = new Record();
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			jdbcOutputFormat.writeRecord(record);
-		}
-
-		jdbcOutputFormat.close();
-		jdbcInputFormat.close();
-
-		jdbcInputFormat = new JDBCInputFormat(
-				driverPath,
-				dbUrl,
-				"select * from " + targetTable);
-		jdbcInputFormat.configure(null);
-
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			Assert.assertEquals(5, record.getNumFields());
-			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
-			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
-			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
-			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
-			int[] pos = {0, 1, 2, 3, 4};
-			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
-			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-
-		jdbcInputFormat.close();
-	}
-}


[3/3] flink git commit: [FLINK-2442] [fix] FieldPositionKeys support Pojo fields

Posted by fh...@apache.org.
[FLINK-2442] [fix] FieldPositionKeys support Pojo fields

This closes #963


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30761572
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30761572
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30761572

Branch: refs/heads/master
Commit: 30761572b5040669b07d261ec9b109797debc549
Parents: b2d8c40
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 30 21:44:06 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:16:30 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/operators/Keys.java   | 50 ++++++++++----------
 .../api/java/typeutils/TupleTypeInfoBase.java   | 20 --------
 .../flink/api/java/operators/KeysTest.java      | 27 +++++++++++
 3 files changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 69d306f..09874e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -223,43 +223,43 @@ public abstract class Keys<T> {
 			} else {
 				groupingFields = rangeCheckFields(groupingFields, type.getArity() -1);
 			}
-			CompositeType<?> compositeType = (CompositeType<?>) type;
 			Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point");
 			
 			keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields());
 			// for each key, find the field:
 			for(int j = 0; j < groupingFields.length; j++) {
+				int keyPos = groupingFields[j];
+
+				int offset = 0;
 				for(int i = 0; i < type.getArity(); i++) {
-					TypeInformation<?> fieldType = compositeType.getTypeAt(i);
-					
-					if(groupingFields[j] == i) { // check if user set the key
-						int keyId = countNestedElementsBefore(compositeType, i) + i;
-						if(fieldType instanceof TupleTypeInfoBase) {
-							TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
-							tupleFieldType.addAllFields(keyId, keyFields);
-						} else {
-							Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field type");
-							keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
+
+					TypeInformation fieldType = ((CompositeType<?>) type).getTypeAt(i);
+					if(i < keyPos) {
+						// not yet there, increment key offset
+						offset += fieldType.getTotalFields();
+					}
+					else {
+						// arrived at key position
+						if(fieldType instanceof CompositeType) {
+							// add all nested fields of composite type
+							((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
 						}
-						
+						else if(fieldType instanceof AtomicType) {
+							// add atomic type field
+							keyFields.add(new FlatFieldDescriptor(offset, fieldType));
+						}
+						else {
+							// type should either be composite or atomic
+							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: "+fieldType);
+						}
+						// go to next key
+						break;
 					}
 				}
 			}
 			keyFields = removeNullElementsFromList(keyFields);
 		}
-		
-		private static int countNestedElementsBefore(CompositeType<?> compositeType, int pos) {
-			if( pos == 0) {
-				return 0;
-			}
-			int ret = 0;
-			for (int i = 0; i < pos; i++) {
-				TypeInformation<?> fieldType = compositeType.getTypeAt(i);
-				ret += fieldType.getTotalFields() -1;
-			}
-			return ret;
-		}
-		
+
 		public static <R> List<R> removeNullElementsFromList(List<R> in) {
 			List<R> elements = new ArrayList<R>();
 			for(R e: in) {

http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 3314ca9..881e690 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -88,25 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 		return tupleType;
 	}
 
-	/**
-	 * Recursively add all fields in this tuple type. We need this in particular to get all
-	 * the types.
-	 * @param startKeyId
-	 * @param keyFields
-	 */
-	public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) {
-		for(int i = 0; i < this.getArity(); i++) {
-			TypeInformation<?> type = this.types[i];
-			if(type instanceof AtomicType) {
-				keyFields.add(new FlatFieldDescriptor(startKeyId, type));
-			} else if(type instanceof TupleTypeInfoBase<?>) {
-				TupleTypeInfoBase<?> ttb = (TupleTypeInfoBase<?>) type;
-				ttb.addAllFields(startKeyId, keyFields);
-			}
-			startKeyId += type.getTotalFields();
-		}
-	}
-
 	@Override
 	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index 67d0240..cf8936d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
@@ -254,4 +255,30 @@ public class KeysTest {
 		ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti);
 		Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
 	}
+
+	@Test
+	public void testTupleWithNestedPojo() {
+
+		TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti =
+				new TupleTypeInfo<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					TypeExtractor.getForClass(Pojo1.class),
+					TypeExtractor.getForClass(PojoWithMultiplePojos.class)
+				);
+
+		ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ek;
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{0}, ti);
+		Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{1}, ti);
+		Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions());
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{2}, ti);
+		Assert.assertArrayEquals(new int[] {3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{}, ti, true);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+	}
 }


[2/3] flink git commit: [FLINK-2205] Fix confusing entries in JobManager WebUI JobConfig section.

Posted by fh...@apache.org.
[FLINK-2205] Fix confusing entries in JobManager WebUI JobConfig section.

Default display for 'Number of execution retries' is now 'deactivated'
and for 'Job parallelism' is 'auto', as suggested in JIRA.

This closes #927


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2d8c40a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2d8c40a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2d8c40a

Branch: refs/heads/master
Commit: b2d8c40a06e0a36e90913d316ff2b003b701fee1
Parents: 06b37bf
Author: Enrique Bautista <eb...@gmail.com>
Authored: Tue Jul 21 18:52:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:14:52 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/web/JobManagerInfoServlet.java      | 2 +-
 .../src/main/resources/web-docs-infoserver/js/analyzer.js        | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2d8c40a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index ce57714..0ecc941 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -436,7 +436,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 			if(ec != null) {
 				wrt.write("\"executionConfig\": {");
 				wrt.write("\"Execution Mode\": \""+ec.getExecutionMode()+"\",");
-				wrt.write("\"Number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
+				wrt.write("\"Max. number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
 				wrt.write("\"Job parallelism\": \""+ec.getParallelism()+"\",");
 				wrt.write("\"Object reuse mode\": \""+ec.isObjectReuseEnabled()+"\"");
 				ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();

http://git-wip-us.apache.org/repos/asf/flink/blob/b2d8c40a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
index 3934017..4030f80 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
@@ -107,6 +107,10 @@ function analyzeTime(json, stacked) {
 		$.each(job.executionConfig, function(key, value) {
 			if(key == "userConfig") {
 				return;
+			} else if(key == "Max. number of execution retries" && value == -1) {
+				value = "deactivated";
+			} else if(key == "Job parallelism" && value == -1) {
+				value = "auto";
 			}
 			configTable += "<tr><td>"+key+"</td><td>"+value+"</td></tr>";
 		});