You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:09 UTC

[19/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
new file mode 100644
index 0000000..2ed2f8c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/** 
+ * 
+ * This splits generator actually does nothing but wrapping the query parameters
+ * computed by the user before creating the {@link JDBCInputFormat} instance.
+ * 
+ * */
+public class GenericParameterValuesProvider implements ParameterValuesProvider {
+
+	private final Serializable[][] parameters;
+	
+	public GenericParameterValuesProvider(Serializable[][] parameters) {
+		this.parameters = parameters;
+	}
+
+	@Override
+	public Serializable[][] getParameterValues(){
+		//do nothing...precomputed externally
+		return parameters;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
new file mode 100644
index 0000000..ac56b98
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+/** 
+ * 
+ * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
+ * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
+ * ranging from the min value up to the max.
+ * 
+ * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
+ * <PRE>
+ *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * </PRE>
+ *
+ * you can use this class to automatically generate the parameters of the BETWEEN clause,
+ * based on the passed constructor parameters.
+ * 
+ * */
+public class NumericBetweenParametersProvider implements ParameterValuesProvider {
+
+	private long fetchSize;
+	private final long min;
+	private final long max;
+	
+	public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
+		this.fetchSize = fetchSize;
+		this.min = min;
+		this.max = max;
+	}
+
+	@Override
+	public Serializable[][] getParameterValues(){
+		double maxElemCount = (max - min) + 1;
+		int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
+		Serializable[][] parameters = new Serializable[size][2];
+		int count = 0;
+		for (long i = min; i < max; i += fetchSize, count++) {
+			long currentLimit = i + fetchSize - 1;
+			parameters[count] = new Long[]{i,currentLimit};
+			if (currentLimit + 1 + fetchSize > max) {
+				parameters[count + 1] = new Long[]{currentLimit + 1, max};
+				break;
+			}
+		}
+		return parameters;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
new file mode 100644
index 0000000..c194497
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/**
+ * 
+ * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
+ * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
+ * 
+ * */
+public interface ParameterValuesProvider {
+
+	/** Returns the necessary parameters array to use for query in parallel a table */
+	public Serializable[][] getParameterValues();
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
new file mode 100644
index 0000000..da9469b
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.table.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCFullTest extends JDBCTestBase {
+
+	@Test
+	public void testJdbcInOut() throws Exception {
+		//run without parallelism
+		runTest(false);
+
+		//cleanup
+		JDBCTestBase.tearDownClass();
+		JDBCTestBase.prepareTestDb();
+		
+		//run expliting parallelism
+		runTest(true);
+		
+	}
+
+	private void runTest(boolean exploitParallelism) {
+		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+		JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(JDBCTestBase.DRIVER_CLASS)
+				.setDBUrl(JDBCTestBase.DB_URL)
+				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo);
+
+		if(exploitParallelism) {
+			final int fetchSize = 1;
+			final Long min = new Long(JDBCTestBase.testData[0][0].toString());
+			final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString());
+			//use a "splittable" query to exploit parallelism
+			inputBuilder = inputBuilder
+					.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+					.setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
+		}
+		DataSet<Row> source = environment.createInput(inputBuilder.finish());
+
+		//NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but
+		//some database, doens't handle correctly null values when no column type specified
+		//in PreparedStatement.setObject (see its javadoc for more details)
+		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(JDBCTestBase.DRIVER_CLASS)
+				.setDBUrl(JDBCTestBase.DB_URL)
+				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
+				.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
+				.finish());
+		try {
+			environment.execute();
+		} catch (Exception e) {
+			Assert.fail("JDBC full test failed. " + e.getMessage());
+		}
+
+		try (
+			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
+			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+			ResultSet resultSet = statement.executeQuery()
+		) {
+			int count = 0;
+			while (resultSet.next()) {
+				count++;
+			}
+			Assert.assertEquals(JDBCTestBase.testData.length, count);
+		} catch (SQLException e) {
+			Assert.fail("JDBC full test failed. " + e.getMessage());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
new file mode 100644
index 0000000..efae076
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.ResultSet;
+
+import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCInputFormatTest extends JDBCTestBase {
+
+	private JDBCInputFormat jdbcInputFormat;
+
+	@After
+	public void tearDown() throws IOException {
+		if (jdbcInputFormat != null) {
+			jdbcInputFormat.close();
+		}
+		jdbcInputFormat = null;
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUntypedRowInfo() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.idontexist")
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_ALL_BOOKS)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidDriver() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.idontexist")
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidURL() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidQuery() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery("iamnotsql")
+				.setRowTypeInfo(rowTypeInfo)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testIncompleteConfiguration() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
+				.finish();
+	}
+
+	@Test
+	public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+		//this query does not exploit parallelism
+		Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length);
+		jdbcInputFormat.openInputFormat();
+		jdbcInputFormat.open(null);
+		Row row =  new Row(5);
+		int recordCount = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			Row next = jdbcInputFormat.nextRecord(row);
+			if (next == null) {
+				break;
+			}
+			
+			if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+			if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+			if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+			if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+			if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+			for (int x = 0; x < 5; x++) {
+				if(testData[recordCount][x]!=null) {
+					Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+				}
+			}
+			recordCount++;
+		}
+		jdbcInputFormat.close();
+		jdbcInputFormat.closeInputFormat();
+		Assert.assertEquals(testData.length, recordCount);
+	}
+	
+	@Test
+	public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
+		final int fetchSize = 1;
+		final Long min = new Long(JDBCTestBase.testData[0][0] + "");
+		final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
+		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+				.setRowTypeInfo(rowTypeInfo)
+				.setParametersProvider(pramProvider)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+
+		jdbcInputFormat.openInputFormat();
+		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+		//this query exploit parallelism (1 split for every id)
+		Assert.assertEquals(testData.length, splits.length);
+		int recordCount = 0;
+		Row row =  new Row(5);
+		for (int i = 0; i < splits.length; i++) {
+			jdbcInputFormat.open(splits[i]);
+			while (!jdbcInputFormat.reachedEnd()) {
+				Row next = jdbcInputFormat.nextRecord(row);
+				if (next == null) {
+					break;
+				}
+				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+				for (int x = 0; x < 5; x++) {
+					if(testData[recordCount][x]!=null) {
+						Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+					}
+				}
+				recordCount++;
+			}
+			jdbcInputFormat.close();
+		}
+		jdbcInputFormat.closeInputFormat();
+		Assert.assertEquals(testData.length, recordCount);
+	}
+	
+	@Test
+	public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException {
+		Serializable[][] queryParameters = new String[2][1];
+		queryParameters[0] = new String[]{"Kumar"};
+		queryParameters[1] = new String[]{"Tan Ah Teck"};
+		ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
+				.setRowTypeInfo(rowTypeInfo)
+				.setParametersProvider(paramProvider)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+		//this query exploit parallelism (1 split for every queryParameters row)
+		Assert.assertEquals(queryParameters.length, splits.length);
+		int recordCount = 0;
+		Row row =  new Row(5);
+		for (int i = 0; i < splits.length; i++) {
+			jdbcInputFormat.open(splits[i]);
+			while (!jdbcInputFormat.reachedEnd()) {
+				Row next = jdbcInputFormat.nextRecord(row);
+				if (next == null) {
+					break;
+				}
+				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+				recordCount++;
+			}
+			jdbcInputFormat.close();
+		}
+		Assert.assertEquals(3, recordCount);
+		jdbcInputFormat.closeInputFormat();
+	}
+	
+	@Test
+	public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_EMPTY)
+				.setRowTypeInfo(rowTypeInfo)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+		jdbcInputFormat.open(null);
+		Row row = new Row(5);
+		int recordsCnt = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			Assert.assertNull(jdbcInputFormat.nextRecord(row));
+			recordsCnt++;
+		}
+		jdbcInputFormat.close();
+		jdbcInputFormat.closeInputFormat();
+		Assert.assertEquals(0, recordsCnt);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..086a84c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest extends JDBCTestBase {
+
+	private JDBCOutputFormat jdbcOutputFormat;
+	private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>();
+
+	@After
+	public void tearDown() throws IOException {
+		if (jdbcOutputFormat != null) {
+			jdbcOutputFormat.close();
+		}
+		jdbcOutputFormat = null;
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidDriver() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.idontexist")
+				.setDBUrl(DB_URL)
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidURL() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidQuery() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery("iamnotsql")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testIncompleteConfiguration() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+				.finish();
+	}
+
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testIncompatibleTypes() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		tuple5.setField(4, 0);
+		tuple5.setField("hello", 1);
+		tuple5.setField("world", 2);
+		tuple5.setField(0.99, 3);
+		tuple5.setField("imthewrongtype", 4);
+
+		Row row = new Row(tuple5.getArity());
+		for (int i = 0; i < tuple5.getArity(); i++) {
+			row.setField(i, tuple5.getField(i));
+		}
+		jdbcOutputFormat.writeRecord(row);
+		jdbcOutputFormat.close();
+	}
+
+	@Test
+	public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
+
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		for (int i = 0; i < testData.length; i++) {
+			Row row = new Row(testData[i].length);
+			for (int j = 0; j < testData[i].length; j++) {
+				row.setField(j, testData[i][j]);
+			}
+			jdbcOutputFormat.writeRecord(row);
+		}
+
+		jdbcOutputFormat.close();
+
+		try (
+			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
+			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+			ResultSet resultSet = statement.executeQuery()
+		) {
+			int recordCount = 0;
+			while (resultSet.next()) {
+				Row row = new Row(tuple5.getArity());
+				for (int i = 0; i < tuple5.getArity(); i++) {
+					row.setField(i, resultSet.getObject(i + 1));
+				}
+				if (row.productElement(0) != null) {
+					Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
+				}
+				if (row.productElement(1) != null) {
+					Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
+				}
+				if (row.productElement(2) != null) {
+					Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
+				}
+				if (row.productElement(3) != null) {
+					Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
+				}
+				if (row.productElement(4) != null) {
+					Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
+				}
+
+				for (int x = 0; x < tuple5.getArity(); x++) {
+					if (JDBCTestBase.testData[recordCount][x] != null) {
+						Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
+					}
+				}
+
+				recordCount++;
+			}
+			Assert.assertEquals(JDBCTestBase.testData.length, recordCount);
+		} catch (SQLException e) {
+			Assert.fail("JDBC OutputFormat test failed. " + e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
new file mode 100644
index 0000000..69ad693
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * Base test class for JDBC Input and Output formats
+ */
+public class JDBCTestBase {
+	
+	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
+	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
+	public static final String INPUT_TABLE = "books";
+	public static final String OUTPUT_TABLE = "newbooks";
+	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
+	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
+	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
+	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
+	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+	
+	protected static Connection conn;
+
+	public static final Object[][] testData = {
+			{1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11},
+			{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+			{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
+			{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+			{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55},
+			{1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66},
+			{1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77},
+			{1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88},
+			{1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99},
+			{1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}};
+
+	public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+		BasicTypeInfo.INT_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.DOUBLE_TYPE_INFO,
+		BasicTypeInfo.INT_TYPE_INFO
+	};
+	
+	public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+
+	public static String getCreateQuery(String tableName) {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
+		sqlQueryBuilder.append(tableName).append(" (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+		return sqlQueryBuilder.toString();
+	}
+	
+	public static String getInsertQuery() {
+		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		for (int i = 0; i < JDBCTestBase.testData.length; i++) {
+			sqlQueryBuilder.append("(")
+			.append(JDBCTestBase.testData[i][0]).append(",'")
+			.append(JDBCTestBase.testData[i][1]).append("','")
+			.append(JDBCTestBase.testData[i][2]).append("',")
+			.append(JDBCTestBase.testData[i][3]).append(",")
+			.append(JDBCTestBase.testData[i][4]).append(")");
+			if (i < JDBCTestBase.testData.length - 1) {
+				sqlQueryBuilder.append(",");
+			}
+		}
+		String insertQuery = sqlQueryBuilder.toString();
+		return insertQuery;
+	}
+	
+	public static final OutputStream DEV_NULL = new OutputStream() {
+		@Override
+		public void write(int b) {
+		}
+	};
+
+	public static void prepareTestDb() throws Exception {
+		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+		Class.forName(DRIVER_CLASS);
+		Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+
+		//create input table
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(getCreateQuery(INPUT_TABLE));
+		stat.close();
+
+		//create output table
+		stat = conn.createStatement();
+		stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
+		stat.close();
+
+		//prepare input data
+		stat = conn.createStatement();
+		stat.execute(JDBCTestBase.getInsertQuery());
+		stat.close();
+
+		conn.close();
+	}
+
+	@BeforeClass
+	public static void setUpClass() throws SQLException {
+		try {
+			System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+			prepareDerbyDatabase();
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
+		Class.forName(DRIVER_CLASS);
+		conn = DriverManager.getConnection(DB_URL + ";create=true");
+		createTable(INPUT_TABLE);
+		createTable(OUTPUT_TABLE);
+		insertDataIntoInputTable();
+		conn.close();
+	}
+	
+	private static void createTable(String tableName) throws SQLException {
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(getCreateQuery(tableName));
+		stat.close();
+	}
+	
+	private static void insertDataIntoInputTable() throws SQLException {
+		Statement stat = conn.createStatement();
+		stat.execute(JDBCTestBase.getInsertQuery());
+		stat.close();
+	}
+
+	@AfterClass
+	public static void tearDownClass() {
+		cleanUpDerbyDatabases();
+	}
+
+	private static void cleanUpDerbyDatabases() {
+		try {
+			Class.forName(DRIVER_CLASS);
+			conn = DriverManager.getConnection(DB_URL + ";create=true");
+			Statement stat = conn.createStatement();
+			stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
+			stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
+			stat.close();
+			conn.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
new file mode 100644
index 0000000..dcb33eb
--- /dev/null
+++ b/flink-connectors/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+
+	<artifactId>flink-connectors</artifactId>
+	<name>flink-connectors</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-avro</module>
+		<module>flink-jdbc</module>
+		<module>flink-hadoop-compatibility</module>
+		<module>flink-hbase</module>
+		<module>flink-hcatalog</module>
+		<module>flink-connector-flume</module>
+		<module>flink-connector-kafka-base</module>
+		<module>flink-connector-kafka-0.8</module>
+		<module>flink-connector-kafka-0.9</module>
+		<module>flink-connector-kafka-0.10</module>
+		<module>flink-connector-elasticsearch</module>
+		<module>flink-connector-elasticsearch2</module>
+		<module>flink-connector-rabbitmq</module>
+		<module>flink-connector-twitter</module>
+		<module>flink-connector-nifi</module>
+		<module>flink-connector-cassandra</module>
+		<module>flink-connector-redis</module>
+		<module>flink-connector-filesystem</module>
+	</modules>
+
+	<!-- See main pom.xml for explanation of profiles -->
+	<profiles>
+		<!--
+			We include the kinesis module only optionally because it contains a dependency
+			licenced under the "Amazon Software License".
+			In accordance with the discussion in https://issues.apache.org/jira/browse/LEGAL-198
+			this is an optional module for Flink.
+		-->
+		<profile>
+			<id>include-kinesis</id>
+			<modules>
+				<module>flink-connector-kinesis</module>
+			</modules>
+		</profile>
+	</profiles>
+	
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
deleted file mode 100644
index 3a1731c..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ /dev/null
@@ -1,179 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-cassandra_2.10</artifactId>
-	<name>flink-connector-cassandra</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-	</properties>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<reuseForks>true</reuseForks>
-					<forkCount>1</forkCount>
-					<argLine>-Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>2.4.1</version>
-				<executions>
-					<!-- Run shade goal on package phase -->
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration combine.self="override">
-							<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-							<artifactSet>
-								<includes>
-									<include>com.datastax.cassandra:cassandra-driver-core</include>
-									<include>com.datastax.cassandra:cassandra-driver-mapping</include>
-									<include>com.google.guava:guava</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-										<exclude>com.google.inject.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.datastax.cassandra</groupId>
-			<artifactId>cassandra-driver-core</artifactId>
-			<version>${driver.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>ch.qos.logback</groupId>
-					<artifactId>logback-classic</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.datastax.cassandra</groupId>
-			<artifactId>cassandra-driver-mapping</artifactId>
-			<version>${driver.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>ch.qos.logback</groupId>
-					<artifactId>logback-classic</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.cassandra</groupId>
-			<artifactId>cassandra-all</artifactId>
-			<version>${cassandra.version}</version>
-			<scope>test</scope>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>ch.qos.logback</groupId>
-					<artifactId>logback-classic</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
deleted file mode 100644
index 849e023..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
-	private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class);
-
-	private final String query;
-	private final ClusterBuilder builder;
-
-	private transient Cluster cluster;
-	private transient Session session;
-	private transient ResultSet resultSet;
-
-	public CassandraInputFormat(String query, ClusterBuilder builder) {
-		Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty");
-		Preconditions.checkArgument(builder != null, "Builder cannot be null");
-
-		this.query = query;
-		this.builder = builder;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		this.cluster = builder.getCluster();
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return cachedStatistics;
-	}
-
-	/**
-	 * Opens a Session and executes the query.
-	 *
-	 * @param ignored
-	 * @throws IOException
-	 */
-	@Override
-	public void open(InputSplit ignored) throws IOException {
-		this.session = cluster.connect();
-		this.resultSet = session.execute(query);
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return resultSet.isExhausted();
-	}
-
-	@Override
-	public OUT nextRecord(OUT reuse) throws IOException {
-		final Row item = resultSet.one();
-		for (int i = 0; i < reuse.getArity(); i++) {
-			reuse.setField(item.getObject(i), i);
-		}
-		return reuse;
-	}
-
-	@Override
-	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
-		return split;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
-		return new DefaultInputSplitAssigner(inputSplits);
-	}
-
-	/**
-	 * Closes all resources used.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			if (session != null) {
-				session.close();
-			}
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-
-		try {
-			if (cluster != null ) {
-				cluster.close();
-			}
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
deleted file mode 100644
index 15d8fb3..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
-	private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
-
-	private final String insertQuery;
-	private final ClusterBuilder builder;
-
-	private transient Cluster cluster;
-	private transient Session session;
-	private transient PreparedStatement prepared;
-	private transient FutureCallback<ResultSet> callback;
-	private transient Throwable exception = null;
-
-	public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
-		Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty");
-		Preconditions.checkArgument(builder != null, "Builder cannot be null");
-
-		this.insertQuery = insertQuery;
-		this.builder = builder;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		this.cluster = builder.getCluster();
-	}
-
-	/**
-	 * Opens a Session to Cassandra and initializes the prepared statement.
-	 *
-	 * @param taskNumber The number of the parallel instance.
-	 * @throws IOException Thrown, if the output could not be opened due to an
-	 *                     I/O problem.
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		this.session = cluster.connect();
-		this.prepared = session.prepare(insertQuery);
-		this.callback = new FutureCallback<ResultSet>() {
-			@Override
-			public void onSuccess(ResultSet ignored) {
-			}
-
-			@Override
-			public void onFailure(Throwable t) {
-				exception = t;
-			}
-		};
-	}
-
-	@Override
-	public void writeRecord(OUT record) throws IOException {
-		if (exception != null) {
-			throw new IOException("write record failed", exception);
-		}
-
-		Object[] fields = new Object[record.getArity()];
-		for (int i = 0; i < record.getArity(); i++) {
-			fields[i] = record.getField(i);
-		}
-		ResultSetFuture result = session.executeAsync(prepared.bind(fields));
-		Futures.addCallback(result, callback);
-	}
-
-	/**
-	 * Closes all resources used.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			if (session != null) {
-				session.close();
-			}
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-
-		try {
-			if (cluster != null ) {
-				cluster.close();
-			}
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
deleted file mode 100644
index 63b76da..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
- * database.
- * 
- * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
- */
-public class CassandraCommitter extends CheckpointCommitter {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final ClusterBuilder builder;
-	private transient Cluster cluster;
-	private transient Session session;
-
-	private String keySpace = "flink_auxiliary";
-	private String table = "checkpoints_";
-
-	/**
-	 * A cache of the last committed checkpoint ids per subtask index. This is used to
-	 * avoid redundant round-trips to Cassandra (see {@link #isCheckpointCommitted(int, long)}.
-	 */
-	private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<>();
-
-	public CassandraCommitter(ClusterBuilder builder) {
-		this.builder = builder;
-		ClosureCleaner.clean(builder, true);
-	}
-
-	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
-		this(builder);
-		this.keySpace = keySpace;
-	}
-
-	/**
-	 * Internally used to set the job ID after instantiation.
-	 */
-	public void setJobId(String id) throws Exception {
-		super.setJobId(id);
-		table += id;
-	}
-
-	/**
-	 * Generates the necessary tables to store information.
-	 *
-	 * @throws Exception
-	 */
-	@Override
-	public void createResource() throws Exception {
-		cluster = builder.getCluster();
-		session = cluster.connect();
-
-		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", keySpace));
-		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
-
-		try {
-			session.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			cluster.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-
-	@Override
-	public void open() throws Exception {
-		if (builder == null) {
-			throw new RuntimeException("No ClusterBuilder was set.");
-		}
-		cluster = builder.getCluster();
-		session = cluster.connect();
-	}
-
-	@Override
-	public void close() throws Exception {
-		this.lastCommittedCheckpoints.clear();
-		try {
-			session.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			cluster.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-
-	@Override
-	public void commitCheckpoint(int subtaskIdx, long checkpointId) {
-		String statement = String.format(
-			"UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;",
-			keySpace, table, checkpointId, operatorId, subtaskIdx);
-
-		session.execute(statement);
-		lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
-	}
-
-	@Override
-	public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
-		// Pending checkpointed buffers are committed in ascending order of their
-		// checkpoint id. This way we can tell if a checkpointed buffer was committed
-		// just by asking the third-party storage system for the last checkpoint id
-		// committed by the specified subtask.
-
-		Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx);
-		if (lastCommittedCheckpoint == null) {
-			String statement = String.format(
-				"SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;",
-				keySpace, table, operatorId, subtaskIdx);
-
-			Iterator<Row> resultIt = session.execute(statement).iterator();
-			if (resultIt.hasNext()) {
-				lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id");
-				lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
-			}
-		}
-		return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
deleted file mode 100644
index 650c481..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster using 
- * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
- * which it uses annotations from
- * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
- * com.datastax.driver.mapping.annotations</a>.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected final Class<IN> clazz;
-	protected transient Mapper<IN> mapper;
-	protected transient MappingManager mappingManager;
-
-	/**
-	 * The main constructor for creating CassandraPojoSink
-	 *
-	 * @param clazz Class<IN> instance
-	 */
-	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
-		super(builder);
-		this.clazz = clazz;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		try {
-			this.mappingManager = new MappingManager(session);
-			this.mapper = mappingManager.mapper(clazz);
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e);
-		}
-	}
-
-	@Override
-	public ListenableFuture<Void> send(IN value) {
-		return mapper.saveAsync(value);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
deleted file mode 100644
index 180b638..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-/**
- * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
- *
- * @param <IN> input type
- */
-public class CassandraSink<IN> {
-	private final boolean useDataStreamSink;
-	private DataStreamSink<IN> sink1;
-	private SingleOutputStreamOperator<IN> sink2;
-
-	private CassandraSink(DataStreamSink<IN> sink) {
-		sink1 = sink;
-		useDataStreamSink = true;
-	}
-
-	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
-		sink2 = sink;
-		useDataStreamSink = false;
-	}
-
-	private SinkTransformation<IN> getSinkTransformation() {
-		return sink1.getTransformation();
-	}
-
-	private StreamTransformation<IN> getStreamTransformation() {
-		return sink2.getTransformation();
-	}
-
-	/**
-	 * Sets the name of this sink. This name is
-	 * used by the visualization and logging during runtime.
-	 *
-	 * @return The named sink.
-	 */
-	public CassandraSink<IN> name(String name) {
-		if (useDataStreamSink) {
-			getSinkTransformation().setName(name);
-		} else {
-			getStreamTransformation().setName(name);
-		}
-		return this;
-	}
-
-	/**
-	 * Sets an ID for this operator.
-	 * <p/>
-	 * <p>The specified ID is used to assign the same operator ID across job
-	 * submissions (for example when starting a job from a savepoint).
-	 * <p/>
-	 * <p><strong>Important</strong>: this ID needs to be unique per
-	 * transformation and job. Otherwise, job submission will fail.
-	 *
-	 * @param uid The unique user-specified ID of this transformation.
-	 * @return The operator with the specified ID.
-	 */
-	public CassandraSink<IN> uid(String uid) {
-		if (useDataStreamSink) {
-			getSinkTransformation().setUid(uid);
-		} else {
-			getStreamTransformation().setUid(uid);
-		}
-		return this;
-	}
-
-	/**
-	 * Sets the parallelism for this sink. The degree must be higher than zero.
-	 *
-	 * @param parallelism The parallelism for this sink.
-	 * @return The sink with set parallelism.
-	 */
-	public CassandraSink<IN> setParallelism(int parallelism) {
-		if (useDataStreamSink) {
-			getSinkTransformation().setParallelism(parallelism);
-		} else {
-			getStreamTransformation().setParallelism(parallelism);
-		}
-		return this;
-	}
-
-	/**
-	 * Turns off chaining for this operator so thread co-location will not be
-	 * used as an optimization.
-	 * <p/>
-	 * <p/>
-	 * Chaining can be turned off for the whole
-	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
-	 * however it is not advised for performance considerations.
-	 *
-	 * @return The sink with chaining disabled
-	 */
-	public CassandraSink<IN> disableChaining() {
-		if (useDataStreamSink) {
-			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
-		} else {
-			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
-		}
-		return this;
-	}
-
-	/**
-	 * Sets the slot sharing group of this operation. Parallel instances of
-	 * operations that are in the same slot sharing group will be co-located in the same
-	 * TaskManager slot, if possible.
-	 * <p/>
-	 * <p>Operations inherit the slot sharing group of input operations if all input operations
-	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
-	 * <p/>
-	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
-	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
-	 *
-	 * @param slotSharingGroup The slot sharing group name.
-	 */
-	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
-		if (useDataStreamSink) {
-			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
-		} else {
-			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
-		}
-		return this;
-	}
-
-	/**
-	 * Writes a DataStream into a Cassandra database.
-	 *
-	 * @param input input DataStream
-	 * @param <IN>  input type
-	 * @return CassandraSinkBuilder, to further configure the sink
-	 */
-	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
-		if (input.getType() instanceof TupleTypeInfo) {
-			DataStream<T> tupleInput = (DataStream<T>) input;
-			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
-		} else {
-			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
-		}
-	}
-
-	public abstract static class CassandraSinkBuilder<IN> {
-		protected final DataStream<IN> input;
-		protected final TypeSerializer<IN> serializer;
-		protected final TypeInformation<IN> typeInfo;
-		protected ClusterBuilder builder;
-		protected String query;
-		protected CheckpointCommitter committer;
-		protected boolean isWriteAheadLogEnabled;
-
-		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-			this.input = input;
-			this.typeInfo = typeInfo;
-			this.serializer = serializer;
-		}
-
-		/**
-		 * Sets the query that is to be executed for every record.
-		 *
-		 * @param query query to use
-		 * @return this builder
-		 */
-		public CassandraSinkBuilder<IN> setQuery(String query) {
-			this.query = query;
-			return this;
-		}
-
-		/**
-		 * Sets the cassandra host to connect to.
-		 *
-		 * @param host host to connect to
-		 * @return this builder
-		 */
-		public CassandraSinkBuilder<IN> setHost(String host) {
-			return setHost(host, 9042);
-		}
-
-		/**
-		 * Sets the cassandra host/port to connect to.
-		 *
-		 * @param host host to connect to
-		 * @param port port to connect to
-		 * @return this builder
-		 */
-		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
-			if (this.builder != null) {
-				throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
-			}
-			this.builder = new ClusterBuilder() {
-				@Override
-				protected Cluster buildCluster(Cluster.Builder builder) {
-					return builder.addContactPoint(host).withPort(port).build();
-				}
-			};
-			return this;
-		}
-
-		/**
-		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
-		 *
-		 * @param builder ClusterBuilder to configure the connection to cassandra
-		 * @return this builder
-		 */
-		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
-			if (this.builder != null) {
-				throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
-			}
-			this.builder = builder;
-			return this;
-		}
-
-		/**
-		 * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
-		 * idempotent updates.
-		 *
-		 * @return this builder
-		 */
-		public CassandraSinkBuilder<IN> enableWriteAheadLog() {
-			this.isWriteAheadLogEnabled = true;
-			return this;
-		}
-
-		/**
-		 * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
-		 * idempotent updates.
-		 *
-		 * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external
-		 *                  resource. By default this information is stored within a separate table within Cassandra.
-		 * @return this builder
-		 */
-		public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
-			this.isWriteAheadLogEnabled = true;
-			this.committer = committer;
-			return this;
-		}
-
-		/**
-		 * Finalizes the configuration of this sink.
-		 *
-		 * @return finalized sink
-		 * @throws Exception
-		 */
-		public abstract CassandraSink<IN> build() throws Exception;
-
-		protected void sanityCheck() {
-			if (builder == null) {
-				throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
-			}
-		}
-	}
-
-	public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
-		public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-			super(input, typeInfo, serializer);
-		}
-
-		@Override
-		protected void sanityCheck() {
-			super.sanityCheck();
-			if (query == null || query.length() == 0) {
-				throw new IllegalArgumentException("Query must not be null or empty.");
-			}
-		}
-
-		@Override
-		public CassandraSink<IN> build() throws Exception {
-			sanityCheck();
-			if (isWriteAheadLogEnabled) {
-				return committer == null
-					? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
-					: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
-			} else {
-				return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
-			}
-		}
-	}
-
-	public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
-		public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-			super(input, typeInfo, serializer);
-		}
-
-		@Override
-		protected void sanityCheck() {
-			super.sanityCheck();
-			if (query != null) {
-				throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input.");
-			}
-		}
-
-		@Override
-		public CassandraSink<IN> build() throws Exception {
-			sanityCheck();
-			if (isWriteAheadLogEnabled) {
-				throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
-			} else {
-				return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
deleted file mode 100644
index 49b1efa..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
-	protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
-	protected transient Cluster cluster;
-	protected transient Session session;
-
-	protected transient Throwable exception = null;
-	protected transient FutureCallback<V> callback;
-
-	private final ClusterBuilder builder;
-
-	protected CassandraSinkBase(ClusterBuilder builder) {
-		this.builder = builder;
-		ClosureCleaner.clean(builder, true);
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		this.callback = new FutureCallback<V>() {
-			@Override
-			public void onSuccess(V ignored) {
-			}
-
-			@Override
-			public void onFailure(Throwable t) {
-				exception = t;
-				LOG.error("Error while sending value.", t);
-			}
-		};
-		this.cluster = builder.getCluster();
-		this.session = cluster.connect();
-	}
-
-	@Override
-	public void invoke(IN value) throws Exception {
-		if (exception != null) {
-			throw new IOException("invoke() failed", exception);
-		}
-		ListenableFuture<V> result = send(value);
-		Futures.addCallback(result, callback);
-	}
-
-	public abstract ListenableFuture<V> send(IN value);
-
-	@Override
-	public void close() {
-		try {
-			if (session != null) {
-				session.close();
-			}
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			if (cluster != null) {
-				cluster.close();
-			}
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
deleted file mode 100644
index 0a9ef06..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
- */
-public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
-	private final String insertQuery;
-	private transient PreparedStatement ps;
-
-	public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		super(builder);
-		this.insertQuery = insertQuery;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		this.ps = session.prepare(insertQuery);
-	}
-
-	@Override
-	public ListenableFuture<ResultSet> send(IN value) {
-		Object[] fields = extract(value);
-		return session.executeAsync(ps.bind(fields));
-	}
-
-	private Object[] extract(IN record) {
-		Object[] al = new Object[record.getArity()];
-		for (int i = 0; i < record.getArity(); i++) {
-			al[i] = record.getField(i);
-		}
-		return al;
-	}
-}