You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:09 UTC
[19/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
new file mode 100644
index 0000000..2ed2f8c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/**
+ *
+ * This splits generator actually does nothing but wrapping the query parameters
+ * computed by the user before creating the {@link JDBCInputFormat} instance.
+ *
+ * */
+public class GenericParameterValuesProvider implements ParameterValuesProvider {
+
+ private final Serializable[][] parameters;
+
+ public GenericParameterValuesProvider(Serializable[][] parameters) {
+ this.parameters = parameters;
+ }
+
+ @Override
+ public Serializable[][] getParameterValues(){
+ //do nothing...precomputed externally
+ return parameters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
new file mode 100644
index 0000000..ac56b98
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+/**
+ *
+ * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
+ * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
+ * ranging from the min value up to the max.
+ *
+ * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
+ * <PRE>
+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * </PRE>
+ *
+ * you can use this class to automatically generate the parameters of the BETWEEN clause,
+ * based on the passed constructor parameters.
+ *
+ * */
+public class NumericBetweenParametersProvider implements ParameterValuesProvider {
+
+ private long fetchSize;
+ private final long min;
+ private final long max;
+
+ public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
+ this.fetchSize = fetchSize;
+ this.min = min;
+ this.max = max;
+ }
+
+ @Override
+ public Serializable[][] getParameterValues(){
+ double maxElemCount = (max - min) + 1;
+ int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
+ Serializable[][] parameters = new Serializable[size][2];
+ int count = 0;
+ for (long i = min; i < max; i += fetchSize, count++) {
+ long currentLimit = i + fetchSize - 1;
+ parameters[count] = new Long[]{i,currentLimit};
+ if (currentLimit + 1 + fetchSize > max) {
+ parameters[count + 1] = new Long[]{currentLimit + 1, max};
+ break;
+ }
+ }
+ return parameters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
new file mode 100644
index 0000000..c194497
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/**
+ *
+ * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
+ * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
+ *
+ * */
+public interface ParameterValuesProvider {
+
+ /** Returns the necessary parameters array to use for query in parallel a table */
+ public Serializable[][] getParameterValues();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
new file mode 100644
index 0000000..da9469b
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.table.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCFullTest extends JDBCTestBase {
+
+ @Test
+ public void testJdbcInOut() throws Exception {
+ //run without parallelism
+ runTest(false);
+
+ //cleanup
+ JDBCTestBase.tearDownClass();
+ JDBCTestBase.prepareTestDb();
+
+ //run expliting parallelism
+ runTest(true);
+
+ }
+
+ private void runTest(boolean exploitParallelism) {
+ ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+ JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(JDBCTestBase.DRIVER_CLASS)
+ .setDBUrl(JDBCTestBase.DB_URL)
+ .setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
+ .setRowTypeInfo(rowTypeInfo);
+
+ if(exploitParallelism) {
+ final int fetchSize = 1;
+ final Long min = new Long(JDBCTestBase.testData[0][0].toString());
+ final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString());
+ //use a "splittable" query to exploit parallelism
+ inputBuilder = inputBuilder
+ .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+ .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
+ }
+ DataSet<Row> source = environment.createInput(inputBuilder.finish());
+
+ //NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but
+ //some database, doens't handle correctly null values when no column type specified
+ //in PreparedStatement.setObject (see its javadoc for more details)
+ source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername(JDBCTestBase.DRIVER_CLASS)
+ .setDBUrl(JDBCTestBase.DB_URL)
+ .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
+ .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
+ .finish());
+ try {
+ environment.execute();
+ } catch (Exception e) {
+ Assert.fail("JDBC full test failed. " + e.getMessage());
+ }
+
+ try (
+ Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
+ PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+ ResultSet resultSet = statement.executeQuery()
+ ) {
+ int count = 0;
+ while (resultSet.next()) {
+ count++;
+ }
+ Assert.assertEquals(JDBCTestBase.testData.length, count);
+ } catch (SQLException e) {
+ Assert.fail("JDBC full test failed. " + e.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
new file mode 100644
index 0000000..efae076
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.ResultSet;
+
+import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCInputFormatTest extends JDBCTestBase {
+
+ private JDBCInputFormat jdbcInputFormat;
+
+ @After
+ public void tearDown() throws IOException {
+ if (jdbcInputFormat != null) {
+ jdbcInputFormat.close();
+ }
+ jdbcInputFormat = null;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUntypedRowInfo() throws IOException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername("org.apache.derby.jdbc.idontexist")
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidDriver() throws IOException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername("org.apache.derby.jdbc.idontexist")
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(rowTypeInfo)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidURL() throws IOException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(rowTypeInfo)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQuery() throws IOException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery("iamnotsql")
+ .setRowTypeInfo(rowTypeInfo)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIncompleteConfiguration() throws IOException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(rowTypeInfo)
+ .finish();
+ }
+
+ @Test
+ public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(rowTypeInfo)
+ .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+ .finish();
+ //this query does not exploit parallelism
+ Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length);
+ jdbcInputFormat.openInputFormat();
+ jdbcInputFormat.open(null);
+ Row row = new Row(5);
+ int recordCount = 0;
+ while (!jdbcInputFormat.reachedEnd()) {
+ Row next = jdbcInputFormat.nextRecord(row);
+ if (next == null) {
+ break;
+ }
+
+ if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+ if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+ if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+ if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+ if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+ for (int x = 0; x < 5; x++) {
+ if(testData[recordCount][x]!=null) {
+ Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+ }
+ }
+ recordCount++;
+ }
+ jdbcInputFormat.close();
+ jdbcInputFormat.closeInputFormat();
+ Assert.assertEquals(testData.length, recordCount);
+ }
+
+ @Test
+ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
+ final int fetchSize = 1;
+ final Long min = new Long(JDBCTestBase.testData[0][0] + "");
+ final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
+ ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+ .setRowTypeInfo(rowTypeInfo)
+ .setParametersProvider(pramProvider)
+ .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+ .finish();
+
+ jdbcInputFormat.openInputFormat();
+ InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+ //this query exploit parallelism (1 split for every id)
+ Assert.assertEquals(testData.length, splits.length);
+ int recordCount = 0;
+ Row row = new Row(5);
+ for (int i = 0; i < splits.length; i++) {
+ jdbcInputFormat.open(splits[i]);
+ while (!jdbcInputFormat.reachedEnd()) {
+ Row next = jdbcInputFormat.nextRecord(row);
+ if (next == null) {
+ break;
+ }
+ if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+ if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+ if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+ if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+ if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+ for (int x = 0; x < 5; x++) {
+ if(testData[recordCount][x]!=null) {
+ Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+ }
+ }
+ recordCount++;
+ }
+ jdbcInputFormat.close();
+ }
+ jdbcInputFormat.closeInputFormat();
+ Assert.assertEquals(testData.length, recordCount);
+ }
+
+ @Test
+ public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException {
+ Serializable[][] queryParameters = new String[2][1];
+ queryParameters[0] = new String[]{"Kumar"};
+ queryParameters[1] = new String[]{"Tan Ah Teck"};
+ ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
+ .setRowTypeInfo(rowTypeInfo)
+ .setParametersProvider(paramProvider)
+ .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+ //this query exploit parallelism (1 split for every queryParameters row)
+ Assert.assertEquals(queryParameters.length, splits.length);
+ int recordCount = 0;
+ Row row = new Row(5);
+ for (int i = 0; i < splits.length; i++) {
+ jdbcInputFormat.open(splits[i]);
+ while (!jdbcInputFormat.reachedEnd()) {
+ Row next = jdbcInputFormat.nextRecord(row);
+ if (next == null) {
+ break;
+ }
+ if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+ if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+ if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+ if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+ if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+ recordCount++;
+ }
+ jdbcInputFormat.close();
+ }
+ Assert.assertEquals(3, recordCount);
+ jdbcInputFormat.closeInputFormat();
+ }
+
+ @Test
+ public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_EMPTY)
+ .setRowTypeInfo(rowTypeInfo)
+ .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ jdbcInputFormat.open(null);
+ Row row = new Row(5);
+ int recordsCnt = 0;
+ while (!jdbcInputFormat.reachedEnd()) {
+ Assert.assertNull(jdbcInputFormat.nextRecord(row));
+ recordsCnt++;
+ }
+ jdbcInputFormat.close();
+ jdbcInputFormat.closeInputFormat();
+ Assert.assertEquals(0, recordsCnt);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..086a84c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest extends JDBCTestBase {
+
+ private JDBCOutputFormat jdbcOutputFormat;
+ private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>();
+
+ @After
+ public void tearDown() throws IOException {
+ if (jdbcOutputFormat != null) {
+ jdbcOutputFormat.close();
+ }
+ jdbcOutputFormat = null;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidDriver() throws IOException {
+ jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername("org.apache.derby.jdbc.idontexist")
+ .setDBUrl(DB_URL)
+ .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+ .finish();
+ jdbcOutputFormat.open(0, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidURL() throws IOException {
+ jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+ .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+ .finish();
+ jdbcOutputFormat.open(0, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQuery() throws IOException {
+ jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery("iamnotsql")
+ .finish();
+ jdbcOutputFormat.open(0, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIncompleteConfiguration() throws IOException {
+ jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+ .finish();
+ }
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIncompatibleTypes() throws IOException {
+ jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+ .finish();
+ jdbcOutputFormat.open(0, 1);
+
+ tuple5.setField(4, 0);
+ tuple5.setField("hello", 1);
+ tuple5.setField("world", 2);
+ tuple5.setField(0.99, 3);
+ tuple5.setField("imthewrongtype", 4);
+
+ Row row = new Row(tuple5.getArity());
+ for (int i = 0; i < tuple5.getArity(); i++) {
+ row.setField(i, tuple5.getField(i));
+ }
+ jdbcOutputFormat.writeRecord(row);
+ jdbcOutputFormat.close();
+ }
+
+ @Test
+ public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
+
+ jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
+ .finish();
+ jdbcOutputFormat.open(0, 1);
+
+ for (int i = 0; i < testData.length; i++) {
+ Row row = new Row(testData[i].length);
+ for (int j = 0; j < testData[i].length; j++) {
+ row.setField(j, testData[i][j]);
+ }
+ jdbcOutputFormat.writeRecord(row);
+ }
+
+ jdbcOutputFormat.close();
+
+ try (
+ Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
+ PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+ ResultSet resultSet = statement.executeQuery()
+ ) {
+ int recordCount = 0;
+ while (resultSet.next()) {
+ Row row = new Row(tuple5.getArity());
+ for (int i = 0; i < tuple5.getArity(); i++) {
+ row.setField(i, resultSet.getObject(i + 1));
+ }
+ if (row.productElement(0) != null) {
+ Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
+ }
+ if (row.productElement(1) != null) {
+ Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
+ }
+ if (row.productElement(2) != null) {
+ Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
+ }
+ if (row.productElement(3) != null) {
+ Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
+ }
+ if (row.productElement(4) != null) {
+ Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
+ }
+
+ for (int x = 0; x < tuple5.getArity(); x++) {
+ if (JDBCTestBase.testData[recordCount][x] != null) {
+ Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
+ }
+ }
+
+ recordCount++;
+ }
+ Assert.assertEquals(JDBCTestBase.testData.length, recordCount);
+ } catch (SQLException e) {
+ Assert.fail("JDBC OutputFormat test failed. " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
new file mode 100644
index 0000000..69ad693
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * Base test class for JDBC Input and Output formats
+ */
+public class JDBCTestBase {
+
+ public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
+ public static final String DB_URL = "jdbc:derby:memory:ebookshop";
+ public static final String INPUT_TABLE = "books";
+ public static final String OUTPUT_TABLE = "newbooks";
+ public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
+ public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
+ public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
+ public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
+ public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+ public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+
+ protected static Connection conn;
+
+ public static final Object[][] testData = {
+ {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11},
+ {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+ {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
+ {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+ {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55},
+ {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66},
+ {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77},
+ {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88},
+ {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99},
+ {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}};
+
+ public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ };
+
+ public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+
+ public static String getCreateQuery(String tableName) {
+ StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
+ sqlQueryBuilder.append(tableName).append(" (");
+ sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+ sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+ sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+ sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+ sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+ sqlQueryBuilder.append("PRIMARY KEY (id))");
+ return sqlQueryBuilder.toString();
+ }
+
+ public static String getInsertQuery() {
+ StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+ for (int i = 0; i < JDBCTestBase.testData.length; i++) {
+ sqlQueryBuilder.append("(")
+ .append(JDBCTestBase.testData[i][0]).append(",'")
+ .append(JDBCTestBase.testData[i][1]).append("','")
+ .append(JDBCTestBase.testData[i][2]).append("',")
+ .append(JDBCTestBase.testData[i][3]).append(",")
+ .append(JDBCTestBase.testData[i][4]).append(")");
+ if (i < JDBCTestBase.testData.length - 1) {
+ sqlQueryBuilder.append(",");
+ }
+ }
+ String insertQuery = sqlQueryBuilder.toString();
+ return insertQuery;
+ }
+
+ public static final OutputStream DEV_NULL = new OutputStream() {
+ @Override
+ public void write(int b) {
+ }
+ };
+
+ public static void prepareTestDb() throws Exception {
+ System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+ Class.forName(DRIVER_CLASS);
+ Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+
+ //create input table
+ Statement stat = conn.createStatement();
+ stat.executeUpdate(getCreateQuery(INPUT_TABLE));
+ stat.close();
+
+ //create output table
+ stat = conn.createStatement();
+ stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
+ stat.close();
+
+ //prepare input data
+ stat = conn.createStatement();
+ stat.execute(JDBCTestBase.getInsertQuery());
+ stat.close();
+
+ conn.close();
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws SQLException {
+ try {
+ System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+ prepareDerbyDatabase();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
+ Class.forName(DRIVER_CLASS);
+ conn = DriverManager.getConnection(DB_URL + ";create=true");
+ createTable(INPUT_TABLE);
+ createTable(OUTPUT_TABLE);
+ insertDataIntoInputTable();
+ conn.close();
+ }
+
+ private static void createTable(String tableName) throws SQLException {
+ Statement stat = conn.createStatement();
+ stat.executeUpdate(getCreateQuery(tableName));
+ stat.close();
+ }
+
+ private static void insertDataIntoInputTable() throws SQLException {
+ Statement stat = conn.createStatement();
+ stat.execute(JDBCTestBase.getInsertQuery());
+ stat.close();
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ cleanUpDerbyDatabases();
+ }
+
+ private static void cleanUpDerbyDatabases() {
+ try {
+ Class.forName(DRIVER_CLASS);
+ conn = DriverManager.getConnection(DB_URL + ";create=true");
+ Statement stat = conn.createStatement();
+ stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
+ stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
+ stat.close();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
new file mode 100644
index 0000000..dcb33eb
--- /dev/null
+++ b/flink-connectors/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+
+ <artifactId>flink-connectors</artifactId>
+ <name>flink-connectors</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-avro</module>
+ <module>flink-jdbc</module>
+ <module>flink-hadoop-compatibility</module>
+ <module>flink-hbase</module>
+ <module>flink-hcatalog</module>
+ <module>flink-connector-flume</module>
+ <module>flink-connector-kafka-base</module>
+ <module>flink-connector-kafka-0.8</module>
+ <module>flink-connector-kafka-0.9</module>
+ <module>flink-connector-kafka-0.10</module>
+ <module>flink-connector-elasticsearch</module>
+ <module>flink-connector-elasticsearch2</module>
+ <module>flink-connector-rabbitmq</module>
+ <module>flink-connector-twitter</module>
+ <module>flink-connector-nifi</module>
+ <module>flink-connector-cassandra</module>
+ <module>flink-connector-redis</module>
+ <module>flink-connector-filesystem</module>
+ </modules>
+
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <!--
+ We include the kinesis module only optionally because it contains a dependency
+ licenced under the "Amazon Software License".
+ In accordance with the discussion in https://issues.apache.org/jira/browse/LEGAL-198
+ this is an optional module for Flink.
+ -->
+ <profile>
+ <id>include-kinesis</id>
+ <modules>
+ <module>flink-connector-kinesis</module>
+ </modules>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
deleted file mode 100644
index 3a1731c..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ /dev/null
@@ -1,179 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-cassandra_2.10</artifactId>
- <name>flink-connector-cassandra</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <cassandra.version>2.2.5</cassandra.version>
- <driver.version>3.0.0</driver.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <reuseForks>true</reuseForks>
- <forkCount>1</forkCount>
- <argLine>-Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.1</version>
- <executions>
- <!-- Run shade goal on package phase -->
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration combine.self="override">
- <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
- <artifactSet>
- <includes>
- <include>com.datastax.cassandra:cassandra-driver-core</include>
- <include>com.datastax.cassandra:cassandra-driver-mapping</include>
- <include>com.google.guava:guava</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern>
- <excludes>
- <exclude>com.google.protobuf.**</exclude>
- <exclude>com.google.inject.**</exclude>
- </excludes>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-core</artifactId>
- <version>${driver.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-mapping</artifactId>
- <version>${driver.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.cassandra</groupId>
- <artifactId>cassandra-all</artifactId>
- <version>${cassandra.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
deleted file mode 100644
index 849e023..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
- private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class);
-
- private final String query;
- private final ClusterBuilder builder;
-
- private transient Cluster cluster;
- private transient Session session;
- private transient ResultSet resultSet;
-
- public CassandraInputFormat(String query, ClusterBuilder builder) {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty");
- Preconditions.checkArgument(builder != null, "Builder cannot be null");
-
- this.query = query;
- this.builder = builder;
- }
-
- @Override
- public void configure(Configuration parameters) {
- this.cluster = builder.getCluster();
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
- return cachedStatistics;
- }
-
- /**
- * Opens a Session and executes the query.
- *
- * @param ignored
- * @throws IOException
- */
- @Override
- public void open(InputSplit ignored) throws IOException {
- this.session = cluster.connect();
- this.resultSet = session.execute(query);
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return resultSet.isExhausted();
- }
-
- @Override
- public OUT nextRecord(OUT reuse) throws IOException {
- final Row item = resultSet.one();
- for (int i = 0; i < reuse.getArity(); i++) {
- reuse.setField(item.getObject(i), i);
- }
- return reuse;
- }
-
- @Override
- public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
- GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
- return split;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
- return new DefaultInputSplitAssigner(inputSplits);
- }
-
- /**
- * Closes all resources used.
- */
- @Override
- public void close() throws IOException {
- try {
- if (session != null) {
- session.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
-
- try {
- if (cluster != null ) {
- cluster.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
deleted file mode 100644
index 15d8fb3..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
- private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
-
- private final String insertQuery;
- private final ClusterBuilder builder;
-
- private transient Cluster cluster;
- private transient Session session;
- private transient PreparedStatement prepared;
- private transient FutureCallback<ResultSet> callback;
- private transient Throwable exception = null;
-
- public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty");
- Preconditions.checkArgument(builder != null, "Builder cannot be null");
-
- this.insertQuery = insertQuery;
- this.builder = builder;
- }
-
- @Override
- public void configure(Configuration parameters) {
- this.cluster = builder.getCluster();
- }
-
- /**
- * Opens a Session to Cassandra and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- this.session = cluster.connect();
- this.prepared = session.prepare(insertQuery);
- this.callback = new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet ignored) {
- }
-
- @Override
- public void onFailure(Throwable t) {
- exception = t;
- }
- };
- }
-
- @Override
- public void writeRecord(OUT record) throws IOException {
- if (exception != null) {
- throw new IOException("write record failed", exception);
- }
-
- Object[] fields = new Object[record.getArity()];
- for (int i = 0; i < record.getArity(); i++) {
- fields[i] = record.getField(i);
- }
- ResultSetFuture result = session.executeAsync(prepared.bind(fields));
- Futures.addCallback(result, callback);
- }
-
- /**
- * Closes all resources used.
- */
- @Override
- public void close() throws IOException {
- try {
- if (session != null) {
- session.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
-
- try {
- if (cluster != null ) {
- cluster.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
deleted file mode 100644
index 63b76da..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
- * database.
- *
- * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
- */
-public class CassandraCommitter extends CheckpointCommitter {
-
- private static final long serialVersionUID = 1L;
-
- private final ClusterBuilder builder;
- private transient Cluster cluster;
- private transient Session session;
-
- private String keySpace = "flink_auxiliary";
- private String table = "checkpoints_";
-
- /**
- * A cache of the last committed checkpoint ids per subtask index. This is used to
- * avoid redundant round-trips to Cassandra (see {@link #isCheckpointCommitted(int, long)}.
- */
- private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<>();
-
- public CassandraCommitter(ClusterBuilder builder) {
- this.builder = builder;
- ClosureCleaner.clean(builder, true);
- }
-
- public CassandraCommitter(ClusterBuilder builder, String keySpace) {
- this(builder);
- this.keySpace = keySpace;
- }
-
- /**
- * Internally used to set the job ID after instantiation.
- */
- public void setJobId(String id) throws Exception {
- super.setJobId(id);
- table += id;
- }
-
- /**
- * Generates the necessary tables to store information.
- *
- * @throws Exception
- */
- @Override
- public void createResource() throws Exception {
- cluster = builder.getCluster();
- session = cluster.connect();
-
- session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", keySpace));
- session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
-
- try {
- session.close();
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- cluster.close();
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-
- @Override
- public void open() throws Exception {
- if (builder == null) {
- throw new RuntimeException("No ClusterBuilder was set.");
- }
- cluster = builder.getCluster();
- session = cluster.connect();
- }
-
- @Override
- public void close() throws Exception {
- this.lastCommittedCheckpoints.clear();
- try {
- session.close();
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- cluster.close();
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-
- @Override
- public void commitCheckpoint(int subtaskIdx, long checkpointId) {
- String statement = String.format(
- "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;",
- keySpace, table, checkpointId, operatorId, subtaskIdx);
-
- session.execute(statement);
- lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
- }
-
- @Override
- public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
- // Pending checkpointed buffers are committed in ascending order of their
- // checkpoint id. This way we can tell if a checkpointed buffer was committed
- // just by asking the third-party storage system for the last checkpoint id
- // committed by the specified subtask.
-
- Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx);
- if (lastCommittedCheckpoint == null) {
- String statement = String.format(
- "SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;",
- keySpace, table, operatorId, subtaskIdx);
-
- Iterator<Row> resultIt = session.execute(statement).iterator();
- if (resultIt.hasNext()) {
- lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id");
- lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
- }
- }
- return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
deleted file mode 100644
index 650c481..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster using
- * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
- * which it uses annotations from
- * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
- * com.datastax.driver.mapping.annotations</a>.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
-
- private static final long serialVersionUID = 1L;
-
- protected final Class<IN> clazz;
- protected transient Mapper<IN> mapper;
- protected transient MappingManager mappingManager;
-
- /**
- * The main constructor for creating CassandraPojoSink
- *
- * @param clazz Class<IN> instance
- */
- public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
- super(builder);
- this.clazz = clazz;
- }
-
- @Override
- public void open(Configuration configuration) {
- super.open(configuration);
- try {
- this.mappingManager = new MappingManager(session);
- this.mapper = mappingManager.mapper(clazz);
- } catch (Exception e) {
- throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e);
- }
- }
-
- @Override
- public ListenableFuture<Void> send(IN value) {
- return mapper.saveAsync(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
deleted file mode 100644
index 180b638..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-/**
- * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
- *
- * @param <IN> input type
- */
-public class CassandraSink<IN> {
- private final boolean useDataStreamSink;
- private DataStreamSink<IN> sink1;
- private SingleOutputStreamOperator<IN> sink2;
-
- private CassandraSink(DataStreamSink<IN> sink) {
- sink1 = sink;
- useDataStreamSink = true;
- }
-
- private CassandraSink(SingleOutputStreamOperator<IN> sink) {
- sink2 = sink;
- useDataStreamSink = false;
- }
-
- private SinkTransformation<IN> getSinkTransformation() {
- return sink1.getTransformation();
- }
-
- private StreamTransformation<IN> getStreamTransformation() {
- return sink2.getTransformation();
- }
-
- /**
- * Sets the name of this sink. This name is
- * used by the visualization and logging during runtime.
- *
- * @return The named sink.
- */
- public CassandraSink<IN> name(String name) {
- if (useDataStreamSink) {
- getSinkTransformation().setName(name);
- } else {
- getStreamTransformation().setName(name);
- }
- return this;
- }
-
- /**
- * Sets an ID for this operator.
- * <p/>
- * <p>The specified ID is used to assign the same operator ID across job
- * submissions (for example when starting a job from a savepoint).
- * <p/>
- * <p><strong>Important</strong>: this ID needs to be unique per
- * transformation and job. Otherwise, job submission will fail.
- *
- * @param uid The unique user-specified ID of this transformation.
- * @return The operator with the specified ID.
- */
- public CassandraSink<IN> uid(String uid) {
- if (useDataStreamSink) {
- getSinkTransformation().setUid(uid);
- } else {
- getStreamTransformation().setUid(uid);
- }
- return this;
- }
-
- /**
- * Sets the parallelism for this sink. The degree must be higher than zero.
- *
- * @param parallelism The parallelism for this sink.
- * @return The sink with set parallelism.
- */
- public CassandraSink<IN> setParallelism(int parallelism) {
- if (useDataStreamSink) {
- getSinkTransformation().setParallelism(parallelism);
- } else {
- getStreamTransformation().setParallelism(parallelism);
- }
- return this;
- }
-
- /**
- * Turns off chaining for this operator so thread co-location will not be
- * used as an optimization.
- * <p/>
- * <p/>
- * Chaining can be turned off for the whole
- * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
- * however it is not advised for performance considerations.
- *
- * @return The sink with chaining disabled
- */
- public CassandraSink<IN> disableChaining() {
- if (useDataStreamSink) {
- getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
- } else {
- getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
- }
- return this;
- }
-
- /**
- * Sets the slot sharing group of this operation. Parallel instances of
- * operations that are in the same slot sharing group will be co-located in the same
- * TaskManager slot, if possible.
- * <p/>
- * <p>Operations inherit the slot sharing group of input operations if all input operations
- * are in the same slot sharing group and no slot sharing group was explicitly specified.
- * <p/>
- * <p>Initially an operation is in the default slot sharing group. An operation can be put into
- * the default group explicitly by setting the slot sharing group to {@code "default"}.
- *
- * @param slotSharingGroup The slot sharing group name.
- */
- public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
- if (useDataStreamSink) {
- getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
- } else {
- getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
- }
- return this;
- }
-
- /**
- * Writes a DataStream into a Cassandra database.
- *
- * @param input input DataStream
- * @param <IN> input type
- * @return CassandraSinkBuilder, to further configure the sink
- */
- public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
- if (input.getType() instanceof TupleTypeInfo) {
- DataStream<T> tupleInput = (DataStream<T>) input;
- return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
- } else {
- return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
- }
- }
-
- public abstract static class CassandraSinkBuilder<IN> {
- protected final DataStream<IN> input;
- protected final TypeSerializer<IN> serializer;
- protected final TypeInformation<IN> typeInfo;
- protected ClusterBuilder builder;
- protected String query;
- protected CheckpointCommitter committer;
- protected boolean isWriteAheadLogEnabled;
-
- public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
- this.input = input;
- this.typeInfo = typeInfo;
- this.serializer = serializer;
- }
-
- /**
- * Sets the query that is to be executed for every record.
- *
- * @param query query to use
- * @return this builder
- */
- public CassandraSinkBuilder<IN> setQuery(String query) {
- this.query = query;
- return this;
- }
-
- /**
- * Sets the cassandra host to connect to.
- *
- * @param host host to connect to
- * @return this builder
- */
- public CassandraSinkBuilder<IN> setHost(String host) {
- return setHost(host, 9042);
- }
-
- /**
- * Sets the cassandra host/port to connect to.
- *
- * @param host host to connect to
- * @param port port to connect to
- * @return this builder
- */
- public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
- if (this.builder != null) {
- throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
- }
- this.builder = new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint(host).withPort(port).build();
- }
- };
- return this;
- }
-
- /**
- * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
- *
- * @param builder ClusterBuilder to configure the connection to cassandra
- * @return this builder
- */
- public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
- if (this.builder != null) {
- throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
- }
- this.builder = builder;
- return this;
- }
-
- /**
- * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
- * idempotent updates.
- *
- * @return this builder
- */
- public CassandraSinkBuilder<IN> enableWriteAheadLog() {
- this.isWriteAheadLogEnabled = true;
- return this;
- }
-
- /**
- * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
- * idempotent updates.
- *
- * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external
- * resource. By default this information is stored within a separate table within Cassandra.
- * @return this builder
- */
- public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
- this.isWriteAheadLogEnabled = true;
- this.committer = committer;
- return this;
- }
-
- /**
- * Finalizes the configuration of this sink.
- *
- * @return finalized sink
- * @throws Exception
- */
- public abstract CassandraSink<IN> build() throws Exception;
-
- protected void sanityCheck() {
- if (builder == null) {
- throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
- }
- }
- }
-
- public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
- public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
- super(input, typeInfo, serializer);
- }
-
- @Override
- protected void sanityCheck() {
- super.sanityCheck();
- if (query == null || query.length() == 0) {
- throw new IllegalArgumentException("Query must not be null or empty.");
- }
- }
-
- @Override
- public CassandraSink<IN> build() throws Exception {
- sanityCheck();
- if (isWriteAheadLogEnabled) {
- return committer == null
- ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
- : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
- } else {
- return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
- }
- }
- }
-
- public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
- public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
- super(input, typeInfo, serializer);
- }
-
- @Override
- protected void sanityCheck() {
- super.sanityCheck();
- if (query != null) {
- throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input.");
- }
- }
-
- @Override
- public CassandraSink<IN> build() throws Exception {
- sanityCheck();
- if (isWriteAheadLogEnabled) {
- throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
- } else {
- return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
deleted file mode 100644
index 49b1efa..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
- protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
- protected transient Cluster cluster;
- protected transient Session session;
-
- protected transient Throwable exception = null;
- protected transient FutureCallback<V> callback;
-
- private final ClusterBuilder builder;
-
- protected CassandraSinkBase(ClusterBuilder builder) {
- this.builder = builder;
- ClosureCleaner.clean(builder, true);
- }
-
- @Override
- public void open(Configuration configuration) {
- this.callback = new FutureCallback<V>() {
- @Override
- public void onSuccess(V ignored) {
- }
-
- @Override
- public void onFailure(Throwable t) {
- exception = t;
- LOG.error("Error while sending value.", t);
- }
- };
- this.cluster = builder.getCluster();
- this.session = cluster.connect();
- }
-
- @Override
- public void invoke(IN value) throws Exception {
- if (exception != null) {
- throw new IOException("invoke() failed", exception);
- }
- ListenableFuture<V> result = send(value);
- Futures.addCallback(result, callback);
- }
-
- public abstract ListenableFuture<V> send(IN value);
-
- @Override
- public void close() {
- try {
- if (session != null) {
- session.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- if (cluster != null) {
- cluster.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
deleted file mode 100644
index 0a9ef06..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
- */
-public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
- private final String insertQuery;
- private transient PreparedStatement ps;
-
- public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
- super(builder);
- this.insertQuery = insertQuery;
- }
-
- @Override
- public void open(Configuration configuration) {
- super.open(configuration);
- this.ps = session.prepare(insertQuery);
- }
-
- @Override
- public ListenableFuture<ResultSet> send(IN value) {
- Object[] fields = extract(value);
- return session.executeAsync(ps.bind(fields));
- }
-
- private Object[] extract(IN record) {
- Object[] al = new Object[record.getArity()];
- for (int i = 0; i < record.getArity(); i++) {
- al[i] = record.getField(i);
- }
- return al;
- }
-}