You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:36 UTC
[46/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index b4246f5..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * InputFormat to read data from a database and generate Rows.
- * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
- *
- * <pre><code>
- * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
- * BasicTypeInfo.INT_TYPE_INFO,
- * BasicTypeInfo.STRING_TYPE_INFO,
- * BasicTypeInfo.STRING_TYPE_INFO,
- * BasicTypeInfo.DOUBLE_TYPE_INFO,
- * BasicTypeInfo.INT_TYPE_INFO
- * };
- *
- * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
- *
- * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- * .setDBUrl("jdbc:derby:memory:ebookshop")
- * .setQuery("select * from books")
- * .setRowTypeInfo(rowTypeInfo)
- * .finish();
- * </code></pre>
- *
- * In order to query the JDBC source in parallel, you need to provide a
- * parameterized query template (i.e. a valid {@link PreparedStatement}) and
- * a {@link ParameterValuesProvider} which provides binding values for the
- * query parameters. E.g.:</br>
- *
- * <pre><code>
- *
- * Serializable[][] queryParameters = new String[2][1];
- * queryParameters[0] = new String[]{"Kumar"};
- * queryParameters[1] = new String[]{"Tan Ah Teck"};
- *
- * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- * .setDBUrl("jdbc:derby:memory:ebookshop")
- * .setQuery("select * from books WHERE author = ?")
- * .setRowTypeInfo(rowTypeInfo)
- * .setParametersProvider(new GenericParameterValuesProvider(queryParameters))
- * .finish();
- * </code></pre>
- *
- * @see Row
- * @see ParameterValuesProvider
- * @see PreparedStatement
- * @see DriverManager
- */
-public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
-
- private String username;
- private String password;
- private String drivername;
- private String dbURL;
- private String queryTemplate;
- private int resultSetType;
- private int resultSetConcurrency;
- private RowTypeInfo rowTypeInfo;
-
- private transient Connection dbConn;
- private transient PreparedStatement statement;
- private transient ResultSet resultSet;
-
- private boolean hasNext;
- private Object[][] parameterValues;
-
- public JDBCInputFormat() {
- }
-
- @Override
- public RowTypeInfo getProducedType() {
- return rowTypeInfo;
- }
-
- @Override
- public void configure(Configuration parameters) {
- //do nothing here
- }
-
- @Override
- public void openInputFormat() {
- //called once per inputFormat (on open)
- try {
- Class.forName(drivername);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
- } catch (SQLException se) {
- throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
- }
- }
-
- @Override
- public void closeInputFormat() {
- //called once per inputFormat (on close)
- try {
- if(statement != null) {
- statement.close();
- }
- } catch (SQLException se) {
- LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
- } finally {
- statement = null;
- }
-
- try {
- if(dbConn != null) {
- dbConn.close();
- }
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } finally {
- dbConn = null;
- }
-
- parameterValues = null;
- }
-
- /**
- * Connects to the source database and executes the query in a <b>parallel
- * fashion</b> if
- * this {@link InputFormat} is built using a parameterized query (i.e. using
- * a {@link PreparedStatement})
- * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
- * fashion</b> otherwise.
- *
- * @param inputSplit which is ignored if this InputFormat is executed as a
- * non-parallel source,
- * a "hook" to the query parameters otherwise (using its
- * <i>splitNumber</i>)
- * @throws IOException if there's an error during the execution of the query
- */
- @Override
- public void open(InputSplit inputSplit) throws IOException {
- try {
- if (inputSplit != null && parameterValues != null) {
- for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
- Object param = parameterValues[inputSplit.getSplitNumber()][i];
- if (param instanceof String) {
- statement.setString(i + 1, (String) param);
- } else if (param instanceof Long) {
- statement.setLong(i + 1, (Long) param);
- } else if (param instanceof Integer) {
- statement.setInt(i + 1, (Integer) param);
- } else if (param instanceof Double) {
- statement.setDouble(i + 1, (Double) param);
- } else if (param instanceof Boolean) {
- statement.setBoolean(i + 1, (Boolean) param);
- } else if (param instanceof Float) {
- statement.setFloat(i + 1, (Float) param);
- } else if (param instanceof BigDecimal) {
- statement.setBigDecimal(i + 1, (BigDecimal) param);
- } else if (param instanceof Byte) {
- statement.setByte(i + 1, (Byte) param);
- } else if (param instanceof Short) {
- statement.setShort(i + 1, (Short) param);
- } else if (param instanceof Date) {
- statement.setDate(i + 1, (Date) param);
- } else if (param instanceof Time) {
- statement.setTime(i + 1, (Time) param);
- } else if (param instanceof Timestamp) {
- statement.setTimestamp(i + 1, (Timestamp) param);
- } else if (param instanceof Array) {
- statement.setArray(i + 1, (Array) param);
- } else {
- //extends with other types if needed
- throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
- }
- }
- resultSet = statement.executeQuery();
- hasNext = resultSet.next();
- } catch (SQLException se) {
- throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
- }
- }
-
- /**
- * Closes all resources used.
- *
- * @throws IOException Indicates that a resource could not be closed.
- */
- @Override
- public void close() throws IOException {
- if(resultSet == null) {
- return;
- }
- try {
- resultSet.close();
- } catch (SQLException se) {
- LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
- }
- }
-
- /**
- * Checks whether all data has been read.
- *
- * @return boolean value indication whether all data has been read.
- * @throws IOException
- */
- @Override
- public boolean reachedEnd() throws IOException {
- return !hasNext;
- }
-
- /**
- * Stores the next resultSet row in a tuple
- *
- * @param row row to be reused.
- * @return row containing next {@link Row}
- * @throws java.io.IOException
- */
- @Override
- public Row nextRecord(Row row) throws IOException {
- try {
- if (!hasNext) {
- return null;
- }
- for (int pos = 0; pos < row.productArity(); pos++) {
- row.setField(pos, resultSet.getObject(pos + 1));
- }
- //update hasNext after we've read the record
- hasNext = resultSet.next();
- return row;
- } catch (SQLException se) {
- throw new IOException("Couldn't read data - " + se.getMessage(), se);
- } catch (NullPointerException npe) {
- throw new IOException("Couldn't access resultSet", npe);
- }
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
- return cachedStatistics;
- }
-
- @Override
- public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
- if (parameterValues == null) {
- return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
- }
- GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
- for (int i = 0; i < ret.length; i++) {
- ret[i] = new GenericInputSplit(i, ret.length);
- }
- return ret;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
- return new DefaultInputSplitAssigner(inputSplits);
- }
-
- /**
- * A builder used to set parameters to the output format's configuration in a fluent way.
- * @return builder
- */
- public static JDBCInputFormatBuilder buildJDBCInputFormat() {
- return new JDBCInputFormatBuilder();
- }
-
- public static class JDBCInputFormatBuilder {
- private final JDBCInputFormat format;
-
- public JDBCInputFormatBuilder() {
- this.format = new JDBCInputFormat();
- //using TYPE_FORWARD_ONLY for high performance reads
- this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
- this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
- }
-
- public JDBCInputFormatBuilder setUsername(String username) {
- format.username = username;
- return this;
- }
-
- public JDBCInputFormatBuilder setPassword(String password) {
- format.password = password;
- return this;
- }
-
- public JDBCInputFormatBuilder setDrivername(String drivername) {
- format.drivername = drivername;
- return this;
- }
-
- public JDBCInputFormatBuilder setDBUrl(String dbURL) {
- format.dbURL = dbURL;
- return this;
- }
-
- public JDBCInputFormatBuilder setQuery(String query) {
- format.queryTemplate = query;
- return this;
- }
-
- public JDBCInputFormatBuilder setResultSetType(int resultSetType) {
- format.resultSetType = resultSetType;
- return this;
- }
-
- public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) {
- format.resultSetConcurrency = resultSetConcurrency;
- return this;
- }
-
- public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
- format.parameterValues = parameterValuesProvider.getParameterValues();
- return this;
- }
-
- public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
- format.rowTypeInfo = rowTypeInfo;
- return this;
- }
-
- public JDBCInputFormat finish() {
- if (format.username == null) {
- LOG.info("Username was not supplied separately.");
- }
- if (format.password == null) {
- LOG.info("Password was not supplied separately.");
- }
- if (format.dbURL == null) {
- throw new IllegalArgumentException("No database URL supplied");
- }
- if (format.queryTemplate == null) {
- throw new IllegalArgumentException("No query supplied");
- }
- if (format.drivername == null) {
- throw new IllegalArgumentException("No driver supplied");
- }
- if (format.rowTypeInfo == null) {
- throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
- }
- if (format.parameterValues == null) {
- LOG.debug("No input splitting configured (data will be read with parallelism 1).");
- }
- return format;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index da4b1ad..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * OutputFormat to write tuples into a database.
- * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
- *
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCOutputFormat extends RichOutputFormat<Row> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
-
- private String username;
- private String password;
- private String drivername;
- private String dbURL;
- private String query;
- private int batchInterval = 5000;
-
- private Connection dbConn;
- private PreparedStatement upload;
-
- private int batchCount = 0;
-
- public int[] typesArray;
-
- public JDBCOutputFormat() {
- }
-
- @Override
- public void configure(Configuration parameters) {
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- establishConnection();
- upload = dbConn.prepareStatement(query);
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("open() failed.", sqe);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
- }
- }
-
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(drivername);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- /**
- * Adds a record to the prepared statement.
- * <p>
- * When this method is called, the output format is guaranteed to be opened.
- * </p>
- *
- * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
- * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
- *
- * @param row The records to add to the output.
- * @see PreparedStatement
- * @throws IOException Thrown, if the records could not be added due to an I/O problem.
- */
- @Override
- public void writeRecord(Row row) throws IOException {
-
- if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
- LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
- }
- try {
-
- if (typesArray == null ) {
- // no types provided
- for (int index = 0; index < row.productArity(); index++) {
- LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
- upload.setObject(index + 1, row.productElement(index));
- }
- } else {
- // types provided
- for (int index = 0; index < row.productArity(); index++) {
-
- if (row.productElement(index) == null) {
- upload.setNull(index + 1, typesArray[index]);
- } else {
- // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
- switch (typesArray[index]) {
- case java.sql.Types.NULL:
- upload.setNull(index + 1, typesArray[index]);
- break;
- case java.sql.Types.BOOLEAN:
- case java.sql.Types.BIT:
- upload.setBoolean(index + 1, (boolean) row.productElement(index));
- break;
- case java.sql.Types.CHAR:
- case java.sql.Types.NCHAR:
- case java.sql.Types.VARCHAR:
- case java.sql.Types.LONGVARCHAR:
- case java.sql.Types.LONGNVARCHAR:
- upload.setString(index + 1, (String) row.productElement(index));
- break;
- case java.sql.Types.TINYINT:
- upload.setByte(index + 1, (byte) row.productElement(index));
- break;
- case java.sql.Types.SMALLINT:
- upload.setShort(index + 1, (short) row.productElement(index));
- break;
- case java.sql.Types.INTEGER:
- upload.setInt(index + 1, (int) row.productElement(index));
- break;
- case java.sql.Types.BIGINT:
- upload.setLong(index + 1, (long) row.productElement(index));
- break;
- case java.sql.Types.REAL:
- upload.setFloat(index + 1, (float) row.productElement(index));
- break;
- case java.sql.Types.FLOAT:
- case java.sql.Types.DOUBLE:
- upload.setDouble(index + 1, (double) row.productElement(index));
- break;
- case java.sql.Types.DECIMAL:
- case java.sql.Types.NUMERIC:
- upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
- break;
- case java.sql.Types.DATE:
- upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
- break;
- case java.sql.Types.TIME:
- upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
- break;
- case java.sql.Types.TIMESTAMP:
- upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
- break;
- case java.sql.Types.BINARY:
- case java.sql.Types.VARBINARY:
- case java.sql.Types.LONGVARBINARY:
- upload.setBytes(index + 1, (byte[]) row.productElement(index));
- break;
- default:
- upload.setObject(index + 1, row.productElement(index));
- LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
- typesArray[index], index + 1, row.productElement(index));
- // case java.sql.Types.SQLXML
- // case java.sql.Types.ARRAY:
- // case java.sql.Types.JAVA_OBJECT:
- // case java.sql.Types.BLOB:
- // case java.sql.Types.CLOB:
- // case java.sql.Types.NCLOB:
- // case java.sql.Types.DATALINK:
- // case java.sql.Types.DISTINCT:
- // case java.sql.Types.OTHER:
- // case java.sql.Types.REF:
- // case java.sql.Types.ROWID:
- // case java.sql.Types.STRUC
- }
- }
- }
- }
- upload.addBatch();
- batchCount++;
- if (batchCount >= batchInterval) {
- upload.executeBatch();
- batchCount = 0;
- }
- } catch (SQLException | IllegalArgumentException e) {
- throw new IllegalArgumentException("writeRecord() failed", e);
- }
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public void close() throws IOException {
- try {
- if (upload != null) {
- upload.executeBatch();
- upload.close();
- }
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } finally {
- upload = null;
- batchCount = 0;
- }
-
- try {
- if (dbConn != null) {
- dbConn.close();
- }
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } finally {
- dbConn = null;
- }
- }
-
- public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
- return new JDBCOutputFormatBuilder();
- }
-
- public static class JDBCOutputFormatBuilder {
- private final JDBCOutputFormat format;
-
- protected JDBCOutputFormatBuilder() {
- this.format = new JDBCOutputFormat();
- }
-
- public JDBCOutputFormatBuilder setUsername(String username) {
- format.username = username;
- return this;
- }
-
- public JDBCOutputFormatBuilder setPassword(String password) {
- format.password = password;
- return this;
- }
-
- public JDBCOutputFormatBuilder setDrivername(String drivername) {
- format.drivername = drivername;
- return this;
- }
-
- public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
- format.dbURL = dbURL;
- return this;
- }
-
- public JDBCOutputFormatBuilder setQuery(String query) {
- format.query = query;
- return this;
- }
-
- public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
- format.batchInterval = batchInterval;
- return this;
- }
-
- public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
- format.typesArray = typesArray;
- return this;
- }
-
- /**
- * Finalizes the configuration and checks validity.
- *
- * @return Configured JDBCOutputFormat
- */
- public JDBCOutputFormat finish() {
- if (format.username == null) {
- LOG.info("Username was not supplied separately.");
- }
- if (format.password == null) {
- LOG.info("Password was not supplied separately.");
- }
- if (format.dbURL == null) {
- throw new IllegalArgumentException("No dababase URL supplied.");
- }
- if (format.query == null) {
- throw new IllegalArgumentException("No query suplied");
- }
- if (format.drivername == null) {
- throw new IllegalArgumentException("No driver supplied");
- }
-
- return format;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
deleted file mode 100644
index 2ed2f8c..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
-/**
- *
- * This splits generator actually does nothing but wrapping the query parameters
- * computed by the user before creating the {@link JDBCInputFormat} instance.
- *
- * */
-public class GenericParameterValuesProvider implements ParameterValuesProvider {
-
- private final Serializable[][] parameters;
-
- public GenericParameterValuesProvider(Serializable[][] parameters) {
- this.parameters = parameters;
- }
-
- @Override
- public Serializable[][] getParameterValues(){
- //do nothing...precomputed externally
- return parameters;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
deleted file mode 100644
index ac56b98..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-/**
- *
- * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
- * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
- * ranging from the min value up to the max.
- *
- * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
- * <PRE>
- * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
- * </PRE>
- *
- * you can use this class to automatically generate the parameters of the BETWEEN clause,
- * based on the passed constructor parameters.
- *
- * */
-public class NumericBetweenParametersProvider implements ParameterValuesProvider {
-
- private long fetchSize;
- private final long min;
- private final long max;
-
- public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
- this.fetchSize = fetchSize;
- this.min = min;
- this.max = max;
- }
-
- @Override
- public Serializable[][] getParameterValues(){
- double maxElemCount = (max - min) + 1;
- int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
- Serializable[][] parameters = new Serializable[size][2];
- int count = 0;
- for (long i = min; i < max; i += fetchSize, count++) {
- long currentLimit = i + fetchSize - 1;
- parameters[count] = new Long[]{i,currentLimit};
- if (currentLimit + 1 + fetchSize > max) {
- parameters[count + 1] = new Long[]{currentLimit + 1, max};
- break;
- }
- }
- return parameters;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
deleted file mode 100644
index c194497..0000000
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
-/**
- *
- * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
- * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
- *
- * */
-public interface ParameterValuesProvider {
-
- /** Returns the necessary parameters array to use for query in parallel a table */
- public Serializable[][] getParameterValues();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
deleted file mode 100644
index da9469b..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
-import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.table.Row;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCFullTest extends JDBCTestBase {
-
- @Test
- public void testJdbcInOut() throws Exception {
- //run without parallelism
- runTest(false);
-
- //cleanup
- JDBCTestBase.tearDownClass();
- JDBCTestBase.prepareTestDb();
-
- //run expliting parallelism
- runTest(true);
-
- }
-
- private void runTest(boolean exploitParallelism) {
- ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
- JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(JDBCTestBase.DRIVER_CLASS)
- .setDBUrl(JDBCTestBase.DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo);
-
- if(exploitParallelism) {
- final int fetchSize = 1;
- final Long min = new Long(JDBCTestBase.testData[0][0].toString());
- final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString());
- //use a "splittable" query to exploit parallelism
- inputBuilder = inputBuilder
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
- .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
- }
- DataSet<Row> source = environment.createInput(inputBuilder.finish());
-
- //NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but
- //some database, doens't handle correctly null values when no column type specified
- //in PreparedStatement.setObject (see its javadoc for more details)
- source.output(JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(JDBCTestBase.DRIVER_CLASS)
- .setDBUrl(JDBCTestBase.DB_URL)
- .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
- .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
- .finish());
- try {
- environment.execute();
- } catch (Exception e) {
- Assert.fail("JDBC full test failed. " + e.getMessage());
- }
-
- try (
- Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
- PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
- ResultSet resultSet = statement.executeQuery()
- ) {
- int count = 0;
- while (resultSet.next()) {
- count++;
- }
- Assert.assertEquals(JDBCTestBase.testData.length, count);
- } catch (SQLException e) {
- Assert.fail("JDBC full test failed. " + e.getMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index efae076..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.ResultSet;
-
-import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
-import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.core.io.InputSplit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCInputFormatTest extends JDBCTestBase {
-
- private JDBCInputFormat jdbcInputFormat;
-
- @After
- public void tearDown() throws IOException {
- if (jdbcInputFormat != null) {
- jdbcInputFormat.close();
- }
- jdbcInputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUntypedRowInfo() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DB_URL)
- .setQuery(SELECT_ALL_BOOKS)
- .finish();
- jdbcInputFormat.openInputFormat();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDriver() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DB_URL)
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
- .finish();
- jdbcInputFormat.openInputFormat();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidURL() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
- .finish();
- jdbcInputFormat.openInputFormat();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery("iamnotsql")
- .setRowTypeInfo(rowTypeInfo)
- .finish();
- jdbcInputFormat.openInputFormat();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testIncompleteConfiguration() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
- .finish();
- }
-
- @Test
- public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(rowTypeInfo)
- .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
- .finish();
- //this query does not exploit parallelism
- Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length);
- jdbcInputFormat.openInputFormat();
- jdbcInputFormat.open(null);
- Row row = new Row(5);
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- Row next = jdbcInputFormat.nextRecord(row);
- if (next == null) {
- break;
- }
-
- if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
- if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
- if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
- if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
- if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
-
- for (int x = 0; x < 5; x++) {
- if(testData[recordCount][x]!=null) {
- Assert.assertEquals(testData[recordCount][x], next.productElement(x));
- }
- }
- recordCount++;
- }
- jdbcInputFormat.close();
- jdbcInputFormat.closeInputFormat();
- Assert.assertEquals(testData.length, recordCount);
- }
-
- @Test
- public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
- final int fetchSize = 1;
- final Long min = new Long(JDBCTestBase.testData[0][0] + "");
- final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
- ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
- .setRowTypeInfo(rowTypeInfo)
- .setParametersProvider(pramProvider)
- .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
- .finish();
-
- jdbcInputFormat.openInputFormat();
- InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
- //this query exploit parallelism (1 split for every id)
- Assert.assertEquals(testData.length, splits.length);
- int recordCount = 0;
- Row row = new Row(5);
- for (int i = 0; i < splits.length; i++) {
- jdbcInputFormat.open(splits[i]);
- while (!jdbcInputFormat.reachedEnd()) {
- Row next = jdbcInputFormat.nextRecord(row);
- if (next == null) {
- break;
- }
- if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
- if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
- if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
- if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
- if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
-
- for (int x = 0; x < 5; x++) {
- if(testData[recordCount][x]!=null) {
- Assert.assertEquals(testData[recordCount][x], next.productElement(x));
- }
- }
- recordCount++;
- }
- jdbcInputFormat.close();
- }
- jdbcInputFormat.closeInputFormat();
- Assert.assertEquals(testData.length, recordCount);
- }
-
- @Test
- public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException {
- Serializable[][] queryParameters = new String[2][1];
- queryParameters[0] = new String[]{"Kumar"};
- queryParameters[1] = new String[]{"Tan Ah Teck"};
- ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
- .setRowTypeInfo(rowTypeInfo)
- .setParametersProvider(paramProvider)
- .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
- .finish();
- jdbcInputFormat.openInputFormat();
- InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
- //this query exploit parallelism (1 split for every queryParameters row)
- Assert.assertEquals(queryParameters.length, splits.length);
- int recordCount = 0;
- Row row = new Row(5);
- for (int i = 0; i < splits.length; i++) {
- jdbcInputFormat.open(splits[i]);
- while (!jdbcInputFormat.reachedEnd()) {
- Row next = jdbcInputFormat.nextRecord(row);
- if (next == null) {
- break;
- }
- if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
- if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
- if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
- if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
- if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
-
- recordCount++;
- }
- jdbcInputFormat.close();
- }
- Assert.assertEquals(3, recordCount);
- jdbcInputFormat.closeInputFormat();
- }
-
- @Test
- public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(SELECT_EMPTY)
- .setRowTypeInfo(rowTypeInfo)
- .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
- .finish();
- jdbcInputFormat.openInputFormat();
- jdbcInputFormat.open(null);
- Row row = new Row(5);
- int recordsCnt = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- Assert.assertNull(jdbcInputFormat.nextRecord(row));
- recordsCnt++;
- }
- jdbcInputFormat.close();
- jdbcInputFormat.closeInputFormat();
- Assert.assertEquals(0, recordsCnt);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index 086a84c..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest extends JDBCTestBase {
-
- private JDBCOutputFormat jdbcOutputFormat;
- private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>();
-
- @After
- public void tearDown() throws IOException {
- if (jdbcOutputFormat != null) {
- jdbcOutputFormat.close();
- }
- jdbcOutputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDriver() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DB_URL)
- .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
- .finish();
- jdbcOutputFormat.open(0, 1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidURL() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
- .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
- .finish();
- jdbcOutputFormat.open(0, 1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery("iamnotsql")
- .finish();
- jdbcOutputFormat.open(0, 1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testIncompleteConfiguration() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
- .finish();
- }
-
-
- @Test(expected = IllegalArgumentException.class)
- public void testIncompatibleTypes() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
- .finish();
- jdbcOutputFormat.open(0, 1);
-
- tuple5.setField(4, 0);
- tuple5.setField("hello", 1);
- tuple5.setField("world", 2);
- tuple5.setField(0.99, 3);
- tuple5.setField("imthewrongtype", 4);
-
- Row row = new Row(tuple5.getArity());
- for (int i = 0; i < tuple5.getArity(); i++) {
- row.setField(i, tuple5.getField(i));
- }
- jdbcOutputFormat.writeRecord(row);
- jdbcOutputFormat.close();
- }
-
- @Test
- public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
-
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
- .finish();
- jdbcOutputFormat.open(0, 1);
-
- for (int i = 0; i < testData.length; i++) {
- Row row = new Row(testData[i].length);
- for (int j = 0; j < testData[i].length; j++) {
- row.setField(j, testData[i][j]);
- }
- jdbcOutputFormat.writeRecord(row);
- }
-
- jdbcOutputFormat.close();
-
- try (
- Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
- PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
- ResultSet resultSet = statement.executeQuery()
- ) {
- int recordCount = 0;
- while (resultSet.next()) {
- Row row = new Row(tuple5.getArity());
- for (int i = 0; i < tuple5.getArity(); i++) {
- row.setField(i, resultSet.getObject(i + 1));
- }
- if (row.productElement(0) != null) {
- Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
- }
- if (row.productElement(1) != null) {
- Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
- }
- if (row.productElement(2) != null) {
- Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
- }
- if (row.productElement(3) != null) {
- Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
- }
- if (row.productElement(4) != null) {
- Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
- }
-
- for (int x = 0; x < tuple5.getArity(); x++) {
- if (JDBCTestBase.testData[recordCount][x] != null) {
- Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
- }
- }
-
- recordCount++;
- }
- Assert.assertEquals(JDBCTestBase.testData.length, recordCount);
- } catch (SQLException e) {
- Assert.fail("JDBC OutputFormat test failed. " + e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
deleted file mode 100644
index 69ad693..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-/**
- * Base test class for JDBC Input and Output formats
- */
-public class JDBCTestBase {
-
- public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
- public static final String DB_URL = "jdbc:derby:memory:ebookshop";
- public static final String INPUT_TABLE = "books";
- public static final String OUTPUT_TABLE = "newbooks";
- public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
- public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
- public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
- public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
- public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
- public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
-
- protected static Connection conn;
-
- public static final Object[][] testData = {
- {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11},
- {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
- {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
- {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
- {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55},
- {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66},
- {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77},
- {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88},
- {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99},
- {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}};
-
- public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO
- };
-
- public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
-
- public static String getCreateQuery(String tableName) {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
- sqlQueryBuilder.append(tableName).append(" (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
- return sqlQueryBuilder.toString();
- }
-
- public static String getInsertQuery() {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- for (int i = 0; i < JDBCTestBase.testData.length; i++) {
- sqlQueryBuilder.append("(")
- .append(JDBCTestBase.testData[i][0]).append(",'")
- .append(JDBCTestBase.testData[i][1]).append("','")
- .append(JDBCTestBase.testData[i][2]).append("',")
- .append(JDBCTestBase.testData[i][3]).append(",")
- .append(JDBCTestBase.testData[i][4]).append(")");
- if (i < JDBCTestBase.testData.length - 1) {
- sqlQueryBuilder.append(",");
- }
- }
- String insertQuery = sqlQueryBuilder.toString();
- return insertQuery;
- }
-
- public static final OutputStream DEV_NULL = new OutputStream() {
- @Override
- public void write(int b) {
- }
- };
-
- public static void prepareTestDb() throws Exception {
- System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
- Class.forName(DRIVER_CLASS);
- Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
-
- //create input table
- Statement stat = conn.createStatement();
- stat.executeUpdate(getCreateQuery(INPUT_TABLE));
- stat.close();
-
- //create output table
- stat = conn.createStatement();
- stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
- stat.close();
-
- //prepare input data
- stat = conn.createStatement();
- stat.execute(JDBCTestBase.getInsertQuery());
- stat.close();
-
- conn.close();
- }
-
- @BeforeClass
- public static void setUpClass() throws SQLException {
- try {
- System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
- prepareDerbyDatabase();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
- Class.forName(DRIVER_CLASS);
- conn = DriverManager.getConnection(DB_URL + ";create=true");
- createTable(INPUT_TABLE);
- createTable(OUTPUT_TABLE);
- insertDataIntoInputTable();
- conn.close();
- }
-
- private static void createTable(String tableName) throws SQLException {
- Statement stat = conn.createStatement();
- stat.executeUpdate(getCreateQuery(tableName));
- stat.close();
- }
-
- private static void insertDataIntoInputTable() throws SQLException {
- Statement stat = conn.createStatement();
- stat.execute(JDBCTestBase.getInsertQuery());
- stat.close();
- }
-
- @AfterClass
- public static void tearDownClass() {
- cleanUpDerbyDatabases();
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- Class.forName(DRIVER_CLASS);
- conn = DriverManager.getConnection(DB_URL + ";create=true");
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
- stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/pom.xml b/flink-batch-connectors/pom.xml
deleted file mode 100644
index d4f65b3..0000000
--- a/flink-batch-connectors/pom.xml
+++ /dev/null
@@ -1,45 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-parent</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
-
- <artifactId>flink-batch-connectors</artifactId>
- <name>flink-batch-connectors</name>
- <packaging>pom</packaging>
-
- <modules>
- <module>flink-avro</module>
- <module>flink-jdbc</module>
- <module>flink-hadoop-compatibility</module>
- <module>flink-hbase</module>
- <module>flink-hcatalog</module>
- </modules>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
new file mode 100644
index 0000000..cdd7c78
--- /dev/null
+++ b/flink-connectors/flink-avro/pom.xml
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-avro_2.10</artifactId>
+ <name>flink-avro</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <!-- version is derived from base module -->
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-test-dependency</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>maven</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/test/assembly/test-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
+ classpath when running the tests to actually test whether the user code class loader
+ is properly used.-->
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <execution>
+ <id>remove-avroexternalprogram</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ <configuration>
+ <excludeDefaultDirectories>true</excludeDefaultDirectories>
+ <filesets>
+ <fileset>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <includes>
+ <include>**/testjar/*.class</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Generate Test class from avro schema -->
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.7.7</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+ <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <versionRange>[2.4,)</versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <versionRange>[1,)</versionRange>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <versionRange>[1.7.7,)</versionRange>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
new file mode 100644
index 0000000..59da4cb
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+
+public class DataInputDecoder extends Decoder {
+
+ private final Utf8 stringDecoder = new Utf8();
+
+ private DataInput in;
+
+ public void setIn(DataInput in) {
+ this.in = in;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // primitives
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void readNull() {}
+
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return in.readBoolean();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return in.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return in.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return in.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return in.readDouble();
+ }
+
+ @Override
+ public int readEnum() throws IOException {
+ return readInt();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // bytes
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void readFixed(byte[] bytes, int start, int length) throws IOException {
+ in.readFully(bytes, start, length);
+ }
+
+ @Override
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ int length = readInt();
+ ByteBuffer result;
+ if (old != null && length <= old.capacity() && old.hasArray()) {
+ result = old;
+ result.clear();
+ } else {
+ result = ByteBuffer.allocate(length);
+ }
+ in.readFully(result.array(), result.arrayOffset() + result.position(), length);
+ result.limit(length);
+ return result;
+ }
+
+
+ @Override
+ public void skipFixed(int length) throws IOException {
+ skipBytes(length);
+ }
+
+ @Override
+ public void skipBytes() throws IOException {
+ int num = readInt();
+ skipBytes(num);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // strings
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public Utf8 readString(Utf8 old) throws IOException {
+ int length = readInt();
+ Utf8 result = (old != null ? old : new Utf8());
+ result.setByteLength(length);
+
+ if (length > 0) {
+ in.readFully(result.getBytes(), 0, length);
+ }
+
+ return result;
+ }
+
+ @Override
+ public String readString() throws IOException {
+ return readString(stringDecoder).toString();
+ }
+
+ @Override
+ public void skipString() throws IOException {
+ int len = readInt();
+ skipBytes(len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // collection types
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public long readArrayStart() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long arrayNext() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long skipArray() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long readMapStart() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long mapNext() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long skipMap() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // union
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int readIndex() throws IOException {
+ return readInt();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utils
+ // --------------------------------------------------------------------------------------------
+
+ private void skipBytes(int num) throws IOException {
+ while (num > 0) {
+ num -= in.skipBytes(num);
+ }
+ }
+
+ public static long readVarLongCount(DataInput in) throws IOException {
+ long value = in.readUnsignedByte();
+
+ if ((value & 0x80) == 0) {
+ return value;
+ }
+ else {
+ long curr;
+ int shift = 7;
+ value = value & 0x7f;
+ while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+ value |= (curr & 0x7f) << shift;
+ shift += 7;
+ }
+ value |= curr << shift;
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
new file mode 100644
index 0000000..0102cc1
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+
+public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private DataOutput out;
+
+
+ public void setOut(DataOutput out) {
+ this.out = out;
+ }
+
+
+ @Override
+ public void flush() throws IOException {}
+
+ // --------------------------------------------------------------------------------------------
+ // primitives
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeNull() {}
+
+
+ @Override
+ public void writeBoolean(boolean b) throws IOException {
+ out.writeBoolean(b);
+ }
+
+ @Override
+ public void writeInt(int n) throws IOException {
+ out.writeInt(n);
+ }
+
+ @Override
+ public void writeLong(long n) throws IOException {
+ out.writeLong(n);
+ }
+
+ @Override
+ public void writeFloat(float f) throws IOException {
+ out.writeFloat(f);
+ }
+
+ @Override
+ public void writeDouble(double d) throws IOException {
+ out.writeDouble(d);
+ }
+
+ @Override
+ public void writeEnum(int e) throws IOException {
+ out.writeInt(e);
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // bytes
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+ out.write(bytes, start, len);
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+ out.writeInt(len);
+ if (len > 0) {
+ out.write(bytes, start, len);
+ }
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer bytes) throws IOException {
+ int num = bytes.remaining();
+ out.writeInt(num);
+
+ if (num > 0) {
+ writeFixed(bytes);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // strings
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeString(String str) throws IOException {
+ byte[] bytes = Utf8.getBytesFor(str);
+ writeBytes(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void writeString(Utf8 utf8) throws IOException {
+ writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // collection types
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeArrayStart() {}
+
+ @Override
+ public void setItemCount(long itemCount) throws IOException {
+ if (itemCount > 0) {
+ writeVarLongCount(out, itemCount);
+ }
+ }
+
+ @Override
+ public void startItem() {}
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ // write a single byte 0, shortcut for a var-length long of 0
+ out.write(0);
+ }
+
+ @Override
+ public void writeMapStart() {}
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ // write a single byte 0, shortcut for a var-length long of 0
+ out.write(0);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // union
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeIndex(int unionIndex) throws IOException {
+ out.writeInt(unionIndex);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utils
+ // --------------------------------------------------------------------------------------------
+
+
+ public static void writeVarLongCount(DataOutput out, long val) throws IOException {
+ if (val < 0) {
+ throw new IOException("Illegal count (must be non-negative): " + val);
+ }
+
+ while ((val & ~0x7FL) != 0) {
+ out.write(((int) val) | 0x80);
+ val >>>= 7;
+ }
+ out.write((int) val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..709c4f1
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.avro.file.SeekableInput;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
+ *
+ * The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+ private final FSDataInputStream stream;
+ private long pos;
+ private long len;
+
+ public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+ this.stream = stream;
+ this.pos = 0;
+ this.len = len;
+ }
+
+ public long length() throws IOException {
+ return this.len;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read;
+ read = stream.read(b, off, len);
+ pos += read;
+ return read;
+ }
+
+ public void seek(long p) throws IOException {
+ stream.seek(p);
+ pos = p;
+ }
+
+ public long tell() throws IOException {
+ return pos;
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+}