You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/04 20:59:59 UTC
[1/3] flink git commit: [FLINK-2005] Remove Record API from jdbc
module
Repository: flink
Updated Branches:
refs/heads/master d570d078a -> 30761572b
[FLINK-2005] Remove Record API from jdbc module
This closes #982
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06b37bf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06b37bf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06b37bf5
Branch: refs/heads/master
Commit: 06b37bf550315bd1d5be7dc3ed6638fd21768e1a
Parents: d570d07
Author: zentol <s....@web.de>
Authored: Tue Aug 4 12:45:22 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:13:30 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-jdbc/pom.xml | 12 -
.../java/record/io/jdbc/JDBCInputFormat.java | 389 -------------------
.../java/record/io/jdbc/JDBCOutputFormat.java | 359 -----------------
.../record/io/jdbc/example/JDBCExample.java | 136 -------
.../java/record/io/jdbc/DevNullLogStream.java | 30 --
.../record/io/jdbc/JDBCInputFormatTest.java | 214 ----------
.../record/io/jdbc/JDBCOutputFormatTest.java | 225 -----------
7 files changed, 1365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml
index 7b499a7..a3976c1 100644
--- a/flink-staging/flink-jdbc/pom.xml
+++ b/flink-staging/flink-jdbc/pom.xml
@@ -41,18 +41,6 @@ under the License.
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index 3cd295b..0000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * InputFormat to read data from a database and generate PactReords.
- * The InputFormat has to be configured with the query, and either all
- * connection parameters or a complete database URL.{@link Configuration} The position of a value inside a Record is
- * determined by the table
- * returned.
- *
- * @see Configuration
- * @see Record
- * @see DriverManager
- */
-public class JDBCInputFormat extends GenericInputFormat implements NonParallelInput {
-
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
-
-
- public final String DRIVER_KEY = "driver";
- public final String USERNAME_KEY = "username";
- public final String PASSWORD_KEY = "password";
- public final String URL_KEY = "url";
- public final String QUERY_KEY = "query";
-
-
- private String username;
- private String password;
- private String driverName;
- private String dbURL;
- private String query;
-
-
- private transient Connection dbConn;
- private transient Statement statement;
- private transient ResultSet resultSet;
-
-
- /**
- * Creates a non-configured JDBCInputFormat. This format has to be
- * configured using configure(configuration).
- */
- public JDBCInputFormat() {}
-
- /**
- * Creates a JDBCInputFormat and configures it.
- *
- * @param driverName
- * JDBC-Drivename
- * @param dbURL
- * Formatted URL containing all connection parameters.
- * @param username
- * @param password
- * @param query
- * Query to execute.
- */
- public JDBCInputFormat(String driverName, String dbURL, String username, String password, String query) {
- this.driverName = driverName;
- this.query = query;
- this.dbURL = dbURL;
- this.username = username;
- this.password = password;
- }
-
- /**
- * Creates a JDBCInputFormat and configures it.
- *
- * @param driverName
- * JDBC-Drivername
- * @param dbURL
- * Formatted URL containing all connection parameters.
- * @param query
- * Query to execute.
- */
- public JDBCInputFormat(String driverName, String dbURL, String query) {
- this(driverName, dbURL, "", "", query);
- }
-
- /**
- * Creates a JDBCInputFormat and configures it.
- *
- * @param parameters
- * Configuration with all connection parameters.
- * @param query
- * Query to execute.
- */
- public JDBCInputFormat(Configuration parameters, String query) {
- this.driverName = parameters.getString(DRIVER_KEY, "");
- this.username = parameters.getString(USERNAME_KEY, "");
- this.password = parameters.getString(PASSWORD_KEY, "");
- this.dbURL = parameters.getString(URL_KEY, "");
- this.query = query;
- }
-
-
- /**
- * Configures this JDBCInputFormat. This includes setting the connection
- * parameters (if necessary), establishing the connection and executing the
- * query.
- *
- * @param parameters
- * Configuration containing all or no parameters.
- */
- @Override
- public void configure(Configuration parameters) {
- boolean needConfigure = isFieldNullOrEmpty(this.query) || isFieldNullOrEmpty(this.dbURL);
- if (needConfigure) {
- this.driverName = parameters.getString(DRIVER_KEY, null);
- this.username = parameters.getString(USERNAME_KEY, null);
- this.password = parameters.getString(PASSWORD_KEY, null);
- this.query = parameters.getString(QUERY_KEY, null);
- this.dbURL = parameters.getString(URL_KEY, null);
- }
-
- try {
- prepareQueryExecution();
- } catch (SQLException e) {
- throw new IllegalArgumentException("Configure failed:\t!", e);
- }
- }
-
- /**
- * Enters data value from the current resultSet into a Record.
- *
- * @param pos
- * Record position to be set.
- * @param type
- * SQL type of the resultSet value.
- * @param record
- * Target Record.
- */
- private void retrieveTypeAndFillRecord(int pos, int type, Record record) throws SQLException,
- NotTransformableSQLFieldException {
- switch (type) {
- case java.sql.Types.NULL:
- record.setField(pos, NullValue.getInstance());
- break;
- case java.sql.Types.BOOLEAN:
- record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
- break;
- case java.sql.Types.BIT:
- record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
- break;
- case java.sql.Types.CHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.NCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.VARCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.LONGVARCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.LONGNVARCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.TINYINT:
- record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
- break;
- case java.sql.Types.SMALLINT:
- record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
- break;
- case java.sql.Types.BIGINT:
- record.setField(pos, new LongValue(resultSet.getLong(pos + 1)));
- break;
- case java.sql.Types.INTEGER:
- record.setField(pos, new IntValue(resultSet.getInt(pos + 1)));
- break;
- case java.sql.Types.FLOAT:
- record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
- break;
- case java.sql.Types.REAL:
- record.setField(pos, new FloatValue(resultSet.getFloat(pos + 1)));
- break;
- case java.sql.Types.DOUBLE:
- record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
- break;
- case java.sql.Types.DECIMAL:
- record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
- break;
- case java.sql.Types.NUMERIC:
- record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
- break;
- case java.sql.Types.DATE:
- record.setField(pos, new StringValue(resultSet.getDate(pos + 1).toString()));
- break;
- case java.sql.Types.TIME:
- record.setField(pos, new LongValue(resultSet.getTime(pos + 1).getTime()));
- break;
- case java.sql.Types.TIMESTAMP:
- record.setField(pos, new StringValue(resultSet.getTimestamp(pos + 1).toString()));
- break;
- case java.sql.Types.SQLXML:
- record.setField(pos, new StringValue(resultSet.getSQLXML(pos + 1).toString()));
- break;
- default:
- throw new NotTransformableSQLFieldException("Unknown sql-type [" + type + "]on column [" + pos + "]");
-
- // case java.sql.Types.BINARY:
- // case java.sql.Types.VARBINARY:
- // case java.sql.Types.LONGVARBINARY:
- // case java.sql.Types.ARRAY:
- // case java.sql.Types.JAVA_OBJECT:
- // case java.sql.Types.BLOB:
- // case java.sql.Types.CLOB:
- // case java.sql.Types.NCLOB:
- // case java.sql.Types.DATALINK:
- // case java.sql.Types.DISTINCT:
- // case java.sql.Types.OTHER:
- // case java.sql.Types.REF:
- // case java.sql.Types.ROWID:
- // case java.sql.Types.STRUCT:
- }
- }
-
- private boolean isFieldNullOrEmpty(String field) {
- return (field == null || field.length() == 0);
- }
-
- private void prepareQueryExecution() throws SQLException {
- setClassForDBType();
- prepareCredentialsAndExecute();
- }
-
- /**
- * Loads appropriate JDBC driver.
- *
- * @param dbType
- * Type of the database.
- * @return boolean value, indication whether an appropriate driver could be
- * found.
- */
- private void setClassForDBType() {
- try {
- Class.forName(driverName);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found:\t" + cnfe.getLocalizedMessage());
- }
- }
-
- private void prepareCredentialsAndExecute() throws SQLException {
- if (isFieldNullOrEmpty(username)) {
- prepareConnection(dbURL);
- } else {
- prepareConnection();
- }
- executeQuery();
- }
-
- /**
- * Establishes a connection to a database.
- *
- * @param dbURL
- * Assembled URL containing all connection parameters.
- * @return boolean value, indicating whether a connection could be
- * established
- */
- private void prepareConnection(String dbURL) throws SQLException {
- dbConn = DriverManager.getConnection(dbURL);
- }
-
- /**
- * Assembles the Database URL and establishes a connection.
- *
- * @param dbType
- * Type of the database.
- * @param username
- * Login username.
- * @param password
- * Login password.
- * @return boolean value, indicating whether a connection could be
- * established
- */
- private void prepareConnection() throws SQLException {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
-
- private void executeQuery() throws SQLException {
- statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
- resultSet = statement.executeQuery(this.query);
- }
-
- /**
- * Checks whether all data has been read.
- *
- * @return boolean value indication whether all data has been read.
- */
- @Override
- public boolean reachedEnd() {
- try {
- if (resultSet.isLast()) {
- resultSet.close();
- statement.close();
- dbConn.close();
- return true;
- } else {
- return false;
- }
- } catch (SQLException e) {
- throw new IllegalArgumentException("Couldn't evaluate reachedEnd():\t" + e.getMessage());
- } catch (NullPointerException e) {
- throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
- }
- }
-
- /**
- * Stores the next resultSet row in a Record
- *
- * @param record
- * target Record
- * @return boolean value indicating that the operation was successful
- */
- @Override
- public Record nextRecord(Record record) {
- try {
- resultSet.next();
- ResultSetMetaData rsmd = resultSet.getMetaData();
- int column_count = rsmd.getColumnCount();
- record.setNumFields(column_count);
-
- for (int pos = 0; pos < column_count; pos++) {
- int type = rsmd.getColumnType(pos + 1);
- retrieveTypeAndFillRecord(pos, type, record);
- }
- return record;
- } catch (SQLException e) {
- throw new IllegalArgumentException("Couldn't read data:\t" + e.getMessage());
- } catch (NotTransformableSQLFieldException e) {
- throw new IllegalArgumentException("Couldn't read data because of unknown column sql-type:\t"
- + e.getMessage());
- } catch (NullPointerException e) {
- throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
- }
- }
-
- public static class NotTransformableSQLFieldException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- public NotTransformableSQLFieldException(String message) {
- super(message);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index 780001a..0000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-public class JDBCOutputFormat implements OutputFormat<Record> {
- private static final long serialVersionUID = 1L;
-
- private static final int DEFAULT_BATCH_INTERVERAL = 5000;
-
- public static final String DRIVER_KEY = "driver";
- public static final String USERNAME_KEY = "username";
- public static final String PASSWORD_KEY = "password";
- public static final String URL_KEY = "url";
- public static final String QUERY_KEY = "query";
- public static final String FIELD_COUNT_KEY = "fields";
- public static final String FIELD_TYPE_KEY = "type";
- public static final String BATCH_INTERVAL = "batchInt";
-
- private Connection dbConn;
- private PreparedStatement upload;
-
- private String username;
- private String password;
- private String driverName;
- private String dbURL;
-
- private String query;
- private int fieldCount;
- private Class<? extends Value>[] fieldClasses;
-
- /**
- * Variable indicating the current number of insert sets in a batch.
- */
- private int batchCount = 0;
-
- /**
- * Commit interval of batches.
- * High batch interval: faster inserts, more memory required (reduce if OutOfMemoryExceptions occur)
- * low batch interval: slower inserts, less memory.
- */
- private int batchInterval = DEFAULT_BATCH_INTERVERAL;
-
-
- /**
- * Configures this JDBCOutputFormat.
- *
- * @param parameters
- * Configuration containing all parameters.
- */
- @Override
- public void configure(Configuration parameters) {
- this.driverName = parameters.getString(DRIVER_KEY, null);
- this.username = parameters.getString(USERNAME_KEY, null);
- this.password = parameters.getString(PASSWORD_KEY, null);
- this.dbURL = parameters.getString(URL_KEY, null);
- this.query = parameters.getString(QUERY_KEY, null);
- this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0);
- this.batchInterval = parameters.getInteger(BATCH_INTERVAL, DEFAULT_BATCH_INTERVERAL);
-
- @SuppressWarnings("unchecked")
- Class<Value>[] classes = new Class[this.fieldCount];
- this.fieldClasses = classes;
-
- ClassLoader cl = getClass().getClassLoader();
-
- try {
- for (int i = 0; i < this.fieldCount; i++) {
- Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_KEY + i, null, cl);
- if (clazz == null) {
- throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
- + "No type class for parameter " + i);
- }
- this.fieldClasses[i] = clazz;
- }
- }
- catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load data type classes.", e);
- }
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- establishConnection();
- upload = dbConn.prepareStatement(query);
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("open() failed:\t!", sqe);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
- }
- }
-
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(driverName);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- /**
- * Adds a record to the prepared statement.
- * <p>
- * When this method is called, the output format is guaranteed to be opened.
- *
- * @param record The records to add to the output.
- * @throws IOException Thrown, if the records could not be added due to an
- * I/O problem.
- */
-
- @Override
- public void writeRecord(Record record) throws IOException {
- try {
- for (int x = 0; x < record.getNumFields(); x++) {
- Value temp = record.getField(x, fieldClasses[x]);
- addValue(x + 1, temp);
- }
- upload.addBatch();
- batchCount++;
- if(batchCount >= batchInterval) {
- upload.executeBatch();
- batchCount = 0;
- }
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("writeRecord() failed:\t", sqe);
- } catch (IllegalArgumentException iae) {
- throw new IllegalArgumentException("writeRecord() failed:\t", iae);
- }
- }
-
- private enum pactType {
- BooleanValue,
- ByteValue,
- CharValue,
- DoubleValue,
- FloatValue,
- IntValue,
- LongValue,
- ShortValue,
- StringValue
- }
-
- private void addValue(int index, Value value) throws SQLException {
- pactType type;
- try {
- type = pactType.valueOf(value.getClass().getSimpleName());
- } catch (IllegalArgumentException iae) {
- throw new IllegalArgumentException("PactType not supported:\t", iae);
- }
- switch (type) {
- case BooleanValue:
- upload.setBoolean(index, ((BooleanValue) value).getValue());
- break;
- case ByteValue:
- upload.setByte(index, ((ByteValue) value).getValue());
- break;
- case CharValue:
- upload.setString(index, String.valueOf(((CharValue) value).getValue()));
- break;
- case DoubleValue:
- upload.setDouble(index, ((DoubleValue) value).getValue());
- break;
- case FloatValue:
- upload.setFloat(index, ((FloatValue) value).getValue());
- break;
- case IntValue:
- upload.setInt(index, ((IntValue) value).getValue());
- break;
- case LongValue:
- upload.setLong(index, ((LongValue) value).getValue());
- break;
- case ShortValue:
- upload.setShort(index, ((ShortValue) value).getValue());
- break;
- case StringValue:
- upload.setString(index, ((StringValue) value).getValue());
- break;
- }
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public void close() throws IOException {
- try {
- upload.executeBatch();
- batchCount = 0;
- upload.close();
- dbConn.close();
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("close() failed:\t", sqe);
- }
- }
-
- /**
- * Creates a configuration builder that can be used to set the
- * output format's parameters to the config in a fluent fashion.
- *
- * @return A config builder for setting parameters.
- */
- public static ConfigBuilder configureOutputFormat(GenericDataSink target) {
- return new ConfigBuilder(target.getParameters());
- }
-
- /**
- * Abstract builder used to set parameters to the output format's
- * configuration in a fluent way.
- */
- protected static abstract class AbstractConfigBuilder<T>
- extends FileOutputFormat.AbstractConfigBuilder<T> {
-
- /**
- * Creates a new builder for the given configuration.
- *
- * @param config The configuration into which the parameters will be written.
- */
- protected AbstractConfigBuilder(Configuration config) {
- super(config);
- }
-
- /**
- * Sets the query field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setQuery(String value) {
- this.config.setString(QUERY_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the url field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setUrl(String value) {
- this.config.setString(URL_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the username field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setUsername(String value) {
- this.config.setString(USERNAME_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the password field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setPassword(String value) {
- this.config.setString(PASSWORD_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the driver field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setDriver(String value) {
- this.config.setString(DRIVER_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the type of a column.
- * Types are applied in the order they were set.
- * @param type PactType to apply.
- * @return The builder itself.
- */
- public T setClass(Class<? extends Value> type) {
- final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0);
- this.config.setClass(FIELD_TYPE_KEY + numYet, type);
- this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
- }
-
- /**
- * A builder used to set parameters to the output format's configuration in a fluent way.
- */
- public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
- /**
- * Creates a new builder for the given configuration.
- *
- * @param targetConfig The configuration into which the parameters will be written.
- */
- protected ConfigBuilder(Configuration targetConfig) {
- super(targetConfig);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 213fd6a..0000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc.example;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Stand-alone example for the JDBC connector.
- *
- * NOTE: To run this example, you need the apache derby code in your classpath.
- * See the Maven file (pom.xml) for a reference to the derby dependency. You can
- * simply Change the scope of the Maven dependency from test to compile.
- */
-public class JDBCExample implements Program, ProgramDescription {
-
- @Override
- public Plan getPlan(String[] args) {
- /*
- * In this example we use the constructor where the url contains all the settings that are needed.
- * You could also use the default constructor and deliver a Configuration with all the needed settings.
- * You also could set the settings to the source-instance.
- */
- GenericDataSource<JDBCInputFormat> source = new GenericDataSource<JDBCInputFormat>(
- new JDBCInputFormat(
- "org.apache.derby.jdbc.EmbeddedDriver",
- "jdbc:derby:memory:ebookshop",
- "select * from books"),
- "Data Source");
-
- GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output");
- JDBCOutputFormat.configureOutputFormat(sink)
- .setDriver("org.apache.derby.jdbc.EmbeddedDriver")
- .setUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
- .setClass(IntValue.class)
- .setClass(StringValue.class)
- .setClass(StringValue.class)
- .setClass(FloatValue.class)
- .setClass(IntValue.class);
-
- sink.addInput(source);
- return new Plan(sink, "JDBC Example Job");
- }
-
- @Override
- public String getDescription() {
- return "Parameter:";
- }
-
- /*
- * To run this example, you need the apache derby code in your classpath!
- */
- public static void main(String[] args) throws Exception {
-
- prepareTestDb();
- JDBCExample tut = new JDBCExample();
- JobExecutionResult res = LocalExecutor.execute(tut, args);
- System.out.println("runtime: " + res.getNetRuntime() + " ms");
-
- System.exit(0);
- }
-
- private static void prepareTestDb() throws Exception {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- Connection conn = DriverManager.getConnection(dbURL);
-
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- conn.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
deleted file mode 100644
index 172f585..0000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.OutputStream;
-
-public class DevNullLogStream {
-
- public static final OutputStream DEV_NULL = new OutputStream() {
- public void write(int b) {}
- };
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 8e0a2c5..0000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
- JDBCInputFormat jdbcInputFormat;
- Configuration config;
- static Connection conn;
- static final Value[][] dbData = {
- {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
- {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
- {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
- {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
- {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- prepareDerbyDatabase();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyDatabase() throws ClassNotFoundException {
- System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- createConnection(dbURL);
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- /*
- Loads JDBC derby driver ; creates(if necessary) and populates database.
- */
- private static void createConnection(String dbURL) {
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTable();
- insertDataToSQLTables();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void createTable() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
-
- @After
- public void tearDown() {
- jdbcInputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidConnection() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDBType() {
- jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUnsupportedSQLType() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent");
- jdbcInputFormat.configure(null);
- jdbcInputFormat.nextRecord(new Record());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotConfiguredFormatNext() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.nextRecord(new Record());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotConfiguredFormatEnd() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.reachedEnd();
- }
-
- @Test
- public void testJDBCInputFormat() throws IOException {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.configure(null);
- Record record = new Record();
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- Assert.assertEquals(5, record.getNumFields());
- Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
- Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
- Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
- Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
- Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
- int[] pos = {0, 1, 2, 3, 4};
- Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
- Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- cleanUpDerbyDatabases();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c824ea1..0000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
- private JDBCInputFormat jdbcInputFormat;
- private JDBCOutputFormat jdbcOutputFormat;
-
- private static Connection conn;
-
- static final Value[][] dbData = {
- {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
- {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
- {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
- {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
- {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- prepareDerbyInputDatabase();
- prepareDerbyOutputDatabase();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.executeUpdate("DROP TABLE newbooks");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTableBooks();
- insertDataToSQLTables();
- conn.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- } catch (SQLException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTableNewBooks();
- conn.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- } catch (SQLException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void createTableBooks() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void createTableNewBooks() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
-
- @After
- public void tearDown() {
- jdbcOutputFormat = null;
- cleanUpDerbyDatabases();
- }
-
- @Test
- public void testJDBCOutputFormat() throws IOException {
- String sourceTable = "books";
- String targetTable = "newbooks";
- String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
- String dbUrl = "jdbc:derby:memory:ebookshop";
-
- Configuration cfg = new Configuration();
- cfg.setString("driver", driverPath);
- cfg.setString("url", dbUrl);
- cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
- cfg.setInteger("fields", 5);
- cfg.setClass("type0", IntValue.class);
- cfg.setClass("type1", StringValue.class);
- cfg.setClass("type2", StringValue.class);
- cfg.setClass("type3", FloatValue.class);
- cfg.setClass("type4", IntValue.class);
-
- jdbcOutputFormat = new JDBCOutputFormat();
- jdbcOutputFormat.configure(cfg);
- jdbcOutputFormat.open(0,1);
-
- jdbcInputFormat = new JDBCInputFormat(
- driverPath,
- dbUrl,
- "select * from " + sourceTable);
- jdbcInputFormat.configure(null);
-
- Record record = new Record();
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- jdbcOutputFormat.writeRecord(record);
- }
-
- jdbcOutputFormat.close();
- jdbcInputFormat.close();
-
- jdbcInputFormat = new JDBCInputFormat(
- driverPath,
- dbUrl,
- "select * from " + targetTable);
- jdbcInputFormat.configure(null);
-
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- Assert.assertEquals(5, record.getNumFields());
- Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
- Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
- Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
- Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
- Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
- int[] pos = {0, 1, 2, 3, 4};
- Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
- Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- jdbcInputFormat.close();
- }
-}
[3/3] flink git commit: [FLINK-2442] [fix] FieldPositionKeys support
Pojo fields
Posted by fh...@apache.org.
[FLINK-2442] [fix] FieldPositionKeys support Pojo fields
This closes #963
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30761572
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30761572
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30761572
Branch: refs/heads/master
Commit: 30761572b5040669b07d261ec9b109797debc549
Parents: b2d8c40
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 30 21:44:06 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:16:30 2015 +0200
----------------------------------------------------------------------
.../apache/flink/api/java/operators/Keys.java | 50 ++++++++++----------
.../api/java/typeutils/TupleTypeInfoBase.java | 20 --------
.../flink/api/java/operators/KeysTest.java | 27 +++++++++++
3 files changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 69d306f..09874e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -223,43 +223,43 @@ public abstract class Keys<T> {
} else {
groupingFields = rangeCheckFields(groupingFields, type.getArity() -1);
}
- CompositeType<?> compositeType = (CompositeType<?>) type;
Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point");
keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields());
// for each key, find the field:
for(int j = 0; j < groupingFields.length; j++) {
+ int keyPos = groupingFields[j];
+
+ int offset = 0;
for(int i = 0; i < type.getArity(); i++) {
- TypeInformation<?> fieldType = compositeType.getTypeAt(i);
-
- if(groupingFields[j] == i) { // check if user set the key
- int keyId = countNestedElementsBefore(compositeType, i) + i;
- if(fieldType instanceof TupleTypeInfoBase) {
- TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
- tupleFieldType.addAllFields(keyId, keyFields);
- } else {
- Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field type");
- keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
+
+ TypeInformation fieldType = ((CompositeType<?>) type).getTypeAt(i);
+ if(i < keyPos) {
+ // not yet there, increment key offset
+ offset += fieldType.getTotalFields();
+ }
+ else {
+ // arrived at key position
+ if(fieldType instanceof CompositeType) {
+ // add all nested fields of composite type
+ ((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
}
-
+ else if(fieldType instanceof AtomicType) {
+ // add atomic type field
+ keyFields.add(new FlatFieldDescriptor(offset, fieldType));
+ }
+ else {
+ // type should either be composite or atomic
+ throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: "+fieldType);
+ }
+ // go to next key
+ break;
}
}
}
keyFields = removeNullElementsFromList(keyFields);
}
-
- private static int countNestedElementsBefore(CompositeType<?> compositeType, int pos) {
- if( pos == 0) {
- return 0;
- }
- int ret = 0;
- for (int i = 0; i < pos; i++) {
- TypeInformation<?> fieldType = compositeType.getTypeAt(i);
- ret += fieldType.getTotalFields() -1;
- }
- return ret;
- }
-
+
public static <R> List<R> removeNullElementsFromList(List<R> in) {
List<R> elements = new ArrayList<R>();
for(R e: in) {
http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 3314ca9..881e690 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -88,25 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
return tupleType;
}
- /**
- * Recursively add all fields in this tuple type. We need this in particular to get all
- * the types.
- * @param startKeyId
- * @param keyFields
- */
- public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) {
- for(int i = 0; i < this.getArity(); i++) {
- TypeInformation<?> type = this.types[i];
- if(type instanceof AtomicType) {
- keyFields.add(new FlatFieldDescriptor(startKeyId, type));
- } else if(type instanceof TupleTypeInfoBase<?>) {
- TupleTypeInfoBase<?> ttb = (TupleTypeInfoBase<?>) type;
- ttb.addAllFields(startKeyId, keyFields);
- }
- startKeyId += type.getTotalFields();
- }
- }
-
@Override
public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index 67d0240..cf8936d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.junit.Assert;
@@ -254,4 +255,30 @@ public class KeysTest {
ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti);
Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
}
+
+ @Test
+ public void testTupleWithNestedPojo() {
+
+ TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti =
+ new TupleTypeInfo<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ TypeExtractor.getForClass(Pojo1.class),
+ TypeExtractor.getForClass(PojoWithMultiplePojos.class)
+ );
+
+ ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ek;
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{0}, ti);
+ Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{1}, ti);
+ Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions());
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{2}, ti);
+ Assert.assertArrayEquals(new int[] {3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{}, ti, true);
+ Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+ }
}
[2/3] flink git commit: [FLINK-2205] Fix confusing entries in
JobManager WebUI JobConfig section.
Posted by fh...@apache.org.
[FLINK-2205] Fix confusing entries in JobManager WebUI JobConfig section.
Default display for 'Number of execution retries' is now 'deactivated'
and for 'Job parallelism' is 'auto', as suggested in JIRA.
This closes #927
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2d8c40a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2d8c40a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2d8c40a
Branch: refs/heads/master
Commit: b2d8c40a06e0a36e90913d316ff2b003b701fee1
Parents: 06b37bf
Author: Enrique Bautista <eb...@gmail.com>
Authored: Tue Jul 21 18:52:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:14:52 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/web/JobManagerInfoServlet.java | 2 +-
.../src/main/resources/web-docs-infoserver/js/analyzer.js | 4 ++++
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b2d8c40a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index ce57714..0ecc941 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -436,7 +436,7 @@ public class JobManagerInfoServlet extends HttpServlet {
if(ec != null) {
wrt.write("\"executionConfig\": {");
wrt.write("\"Execution Mode\": \""+ec.getExecutionMode()+"\",");
- wrt.write("\"Number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
+ wrt.write("\"Max. number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
wrt.write("\"Job parallelism\": \""+ec.getParallelism()+"\",");
wrt.write("\"Object reuse mode\": \""+ec.isObjectReuseEnabled()+"\"");
ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
http://git-wip-us.apache.org/repos/asf/flink/blob/b2d8c40a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
index 3934017..4030f80 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
@@ -107,6 +107,10 @@ function analyzeTime(json, stacked) {
$.each(job.executionConfig, function(key, value) {
if(key == "userConfig") {
return;
+ } else if(key == "Max. number of execution retries" && value == -1) {
+ value = "deactivated";
+ } else if(key == "Job parallelism" && value == -1) {
+ value = "auto";
}
configTable += "<tr><td>"+key+"</td><td>"+value+"</td></tr>";
});