You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/16 14:58:33 UTC

[4/4] flink git commit: [FLINK-3332] Add Exactly-Once Cassandra connector

[FLINK-3332] Add Exactly-Once Cassandra connector


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a28a2d09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a28a2d09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a28a2d09

Branch: refs/heads/master
Commit: a28a2d09626b02b18e846556945c70791e5c2502
Parents: 57ef6c3
Author: zentol <ch...@apache.org>
Authored: Tue Mar 8 17:00:24 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 16 16:58:04 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/cassandra.md     | 103 +++++++
 docs/apis/streaming/connectors/index.md         |   3 +-
 docs/apis/streaming/fault_tolerance.md          |   5 +
 .../api/writer/ResultPartitionWriter.java       |   2 +-
 .../flink-connector-cassandra/pom.xml           | 163 +++++++++++
 .../cassandra/CassandraCommitter.java           | 126 +++++++++
 .../CassandraIdempotentExactlyOnceSink.java     | 116 ++++++++
 .../connectors/cassandra/CassandraSink.java     | 282 +++++++++++++++++++
 .../connectors/cassandra/ClusterBuilder.java    |  31 ++
 .../cassandra/CassandraConnectorTest.java       | 219 ++++++++++++++
 ...ssandraIdempotentExactlyOnceSinkExample.java |  88 ++++++
 .../src/test/resources/cassandra.yaml           |  43 +++
 .../src/test/resources/log4j-test.properties    |  29 ++
 flink-streaming-connectors/pom.xml              |   1 +
 .../runtime/operators/CheckpointCommitter.java  | 114 ++++++++
 .../operators/GenericAtLeastOnceSink.java       | 197 +++++++++++++
 .../operators/AtLeastOnceSinkTestBase.java      | 218 ++++++++++++++
 .../operators/GenericAtLeastOnceSinkTest.java   | 147 ++++++++++
 18 files changed, 1885 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/docs/apis/streaming/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/cassandra.md b/docs/apis/streaming/connectors/cassandra.md
new file mode 100644
index 0000000..520354f
--- /dev/null
+++ b/docs/apis/streaming/connectors/cassandra.md
@@ -0,0 +1,103 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides sinks that writes data into a [Cassandra](https://cassandra.apache.org/) database.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-cassandra{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Installing Apache Cassandra
+Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
+
+#### Cassandra Sink
+
+Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
+This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
+
+The following configuration methods can be used:
+
+1. setQuery(String query)
+2. setConsistencyLevel(ConsistencyLevel level)
+3. setClusterBuilder(ClusterBuilder builder)
+4. setCheckpointCommitter(CheckpointCommitter committer)
+5. build()
+
+setQuery() sets the query that is executed for every value the sink receives.
+setConsistencyLevel() sets the desired consistency level (AT_LEAST_ONCE / EXACTLY_ONCE).
+setClusterBuilder() sets the cluster builder that is used to configure the connection to cassandra.
+setCheckpointCommitter() is an optional method for EXACTLY_ONCE processing.
+
+A checkpoint committer stores additional information about completed checkpoints
+in some resource. You can use a `CassandraCommitter` to store these in a separate
+table in cassandra. Note that this table will NOT be cleaned up by Flink.
+
+build() finalizes the configuration and returns the CassandraSink.
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+CassandraSink.addSink(input)
+  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+  .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
+  .setCheckpointCommitter(new CassandraCommitter())
+  .setClusterBuilder(new ClusterBuilder() {
+    @Override
+    public Cluster buildCluster(Cluster.Builder builder) {
+      return builder.addContactPoint("127.0.0.1").build();
+    }
+  })
+  .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+CassandraSink.addSink(input)
+  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+  .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
+  .setCheckpointCommitter(new CassandraCommitter())
+  .setClusterBuilder(new ClusterBuilder() {
+    @Override
+    public Cluster buildCluster(Cluster.Builder builder) {
+      return builder.addContactPoint("127.0.0.1").build();
+    }
+  })
+  .build();
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/docs/apis/streaming/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md
index 85a07a1..87182b7 100644
--- a/docs/apis/streaming/connectors/index.md
+++ b/docs/apis/streaming/connectors/index.md
@@ -4,7 +4,7 @@ title: "Streaming Connectors"
 # Sub-level navigation
 sub-nav-group: streaming
 sub-nav-id: connectors
-sub-nav-pos: 8
+sub-nav-pos: 6
 sub-nav-title: Connectors
 ---
 <!--
@@ -38,6 +38,7 @@ Currently these systems are supported:
  * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source)
  * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
  * [Apache NiFi](https://nifi.apache.org) (sink/source)
+ * [Apache Cassandra](https://cassandra.apache.org/) (sink)
 
 To run an application using one of these connectors, additional third party
 components are usually required to be installed and launched, e.g. the servers

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
index 7f861fc..43e65a9 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -177,6 +177,11 @@ state updates) of Flink coupled with bundled sinks:
         <td></td>
     </tr>
     <tr>
+        <td>Cassandra sink</td>
+        <td>exactly-once</td>
+        <td>only for idempotent updates</td>
+    </tr>
+    <tr>
         <td>File sinks</td>
         <td>at least once</td>
         <td></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 79c21c6..cfab34d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
  * The {@link ResultPartitionWriter} is the runtime API for producing results. It
  * supports two kinds of data to be sent: buffers and events.
  */
-public final class ResultPartitionWriter implements EventListener<TaskEvent> {
+public class ResultPartitionWriter implements EventListener<TaskEvent> {
 
 	private final ResultPartition partition;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
new file mode 100644
index 0000000..5242c13
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-cassandra_2.10</artifactId>
+	<name>flink-connector-cassandra</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<cassandra.version>2.2.5</cassandra.version>
+		<driver.version>3.0.0</driver.version>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<reuseForks>true</reuseForks>
+					<forkCount>1</forkCount>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.4.1</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes>
+									<include>com.datastax.cassandra:cassandra-driver-core</include>
+									<include>com.datastax.cassandra:cassandra-driver-mapping</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>com.google</pattern>
+									<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+									<excludes>
+										<exclude>com.google.protobuf.**</exclude>
+										<exclude>com.google.inject.**</exclude>
+									</excludes>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.datastax.cassandra</groupId>
+			<artifactId>cassandra-driver-core</artifactId>
+			<version>${driver.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>log4j-over-slf4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-classic</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.datastax.cassandra</groupId>
+			<artifactId>cassandra-driver-mapping</artifactId>
+			<version>${driver.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>log4j-over-slf4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-classic</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>16.0.1</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.cassandra</groupId>
+			<artifactId>cassandra-all</artifactId>
+			<version>${cassandra.version}</version>
+			<scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>log4j-over-slf4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-classic</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
new file mode 100644
index 0000000..c5d47e7
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
+ * database.
+ * <p/>
+ * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+	private ClusterBuilder builder;
+	private transient Cluster cluster;
+	private transient Session session;
+
+	private static final String KEYSPACE = "flink_auxiliary";
+	private String TABLE = "checkpoints_";
+
+	private transient PreparedStatement deleteStatement;
+	private transient PreparedStatement updateStatement;
+	private transient PreparedStatement selectStatement;
+
+	public CassandraCommitter(ClusterBuilder builder) {
+		this.builder = builder;
+		ClosureCleaner.clean(builder, true);
+	}
+
+	/**
+	 * Internally used to set the job ID after instantiation.
+	 *
+	 * @param id
+	 * @throws Exception
+	 */
+	public void setJobId(String id) throws Exception {
+		super.setJobId(id);
+		TABLE += id;
+	}
+
+	/**
+	 * Generates the necessary tables to store information.
+	 *
+	 * @return
+	 * @throws Exception
+	 */
+	@Override
+	public void createResource() throws Exception {
+		cluster = builder.getCluster();
+		session = cluster.connect();
+
+		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
+		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
+
+		try {
+			session.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing session.", e);
+		}
+		try {
+			cluster.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing cluster.", e);
+		}
+	}
+
+	@Override
+	public void open() throws Exception {
+		if (builder == null) {
+			throw new RuntimeException("No ClusterBuilder was set.");
+		}
+		cluster = builder.getCluster();
+		session = cluster.connect();
+
+		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+
+		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", KEYSPACE, TABLE, operatorId, subtaskId));
+	}
+
+	@Override
+	public void close() throws Exception {
+		session.executeAsync(deleteStatement.bind());
+		try {
+			session.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing session.", e);
+		}
+		try {
+			cluster.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing cluster.", e);
+		}
+	}
+
+	@Override
+	public void commitCheckpoint(long checkpointID) {
+		session.execute(updateStatement.bind(checkpointID));
+	}
+
+	@Override
+	public boolean isCheckpointCommitted(long checkpointID) {
+		long lastId = session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
+		return checkpointID <= lastId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
new file mode 100644
index 0000000..b9de9c4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing
+ * mechanism and provides exactly-once guarantees for idempotent updates.
+ * <p/>
+ * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraIdempotentExactlyOnceSink<IN extends Tuple> extends GenericAtLeastOnceSink<IN> {
+	protected transient Cluster cluster;
+	protected transient Session session;
+
+	private final String insertQuery;
+	private transient PreparedStatement preparedStatement;
+
+	private transient Throwable exception = null;
+	private transient FutureCallback<ResultSet> callback;
+
+	private ClusterBuilder builder;
+
+	protected CassandraIdempotentExactlyOnceSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception {
+		super(committer, serializer, jobID);
+		this.insertQuery = insertQuery;
+		this.builder = builder;
+		ClosureCleaner.clean(builder, true);
+	}
+
+	public void open() throws Exception {
+		super.open();
+		this.callback = new FutureCallback<ResultSet>() {
+			@Override
+			public void onSuccess(ResultSet resultSet) {
+			}
+
+			@Override
+			public void onFailure(Throwable throwable) {
+				exception = throwable;
+			}
+		};
+		cluster = builder.getCluster();
+		session = cluster.connect();
+		preparedStatement = session.prepare(insertQuery);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		try {
+			session.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing session.", e);
+		}
+		try {
+			cluster.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing cluster.", e);
+		}
+	}
+
+	@Override
+	protected void sendValue(Iterable<IN> values, long timestamp) throws Exception {
+		//verify that no query failed until now
+		if (exception != null) {
+			throw new Exception(exception);
+		}
+		//set values for prepared statement
+		for (IN value : values) {
+			Object[] fields = new Object[value.getArity()];
+			for (int x = 0; x < value.getArity(); x++) {
+				fields[x] = value.getField(x);
+			}
+			//insert values and send to cassandra
+			BoundStatement s = preparedStatement.bind(fields);
+			s.setDefaultTimestamp(timestamp);
+			ResultSetFuture result = session.executeAsync(s);
+			if (result != null) {
+				//add callback to detect errors
+				Futures.addCallback(result, callback);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
new file mode 100644
index 0000000..012ece5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
+ *
+ * @param <IN> input type
+ */
+public class CassandraSink<IN> {
+	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
+	private final boolean useDataStreamSink;
+	private DataStreamSink<IN> sink1;
+	private SingleOutputStreamOperator<IN> sink2;
+
+	private CassandraSink(DataStreamSink<IN> sink) {
+		sink1 = sink;
+		useDataStreamSink = true;
+	}
+
+	private CassandraSink(SingleOutputStreamOperator<IN> sink) {
+		sink2 = sink;
+		useDataStreamSink = false;
+	}
+
+	private SinkTransformation<IN> getSinkTransformation() {
+		return sink1.getTransformation();
+	}
+
+	private StreamTransformation<IN> getStreamTransformation() {
+		return sink2.getTransformation();
+	}
+
+	/**
+	 * Sets the name of this sink. This name is
+	 * used by the visualization and logging during runtime.
+	 *
+	 * @return The named sink.
+	 */
+	public CassandraSink<IN> name(String name) {
+		if (useDataStreamSink) {
+			getSinkTransformation().setName(name);
+		} else {
+			getStreamTransformation().setName(name);
+		}
+		return this;
+	}
+
+	/**
+	 * Sets an ID for this operator.
+	 * <p/>
+	 * <p>The specified ID is used to assign the same operator ID across job
+	 * submissions (for example when starting a job from a savepoint).
+	 * <p/>
+	 * <p><strong>Important</strong>: this ID needs to be unique per
+	 * transformation and job. Otherwise, job submission will fail.
+	 *
+	 * @param uid The unique user-specified ID of this transformation.
+	 * @return The operator with the specified ID.
+	 */
+	public CassandraSink<IN> uid(String uid) {
+		if (useDataStreamSink) {
+			getSinkTransformation().setUid(uid);
+		} else {
+			getStreamTransformation().setUid(uid);
+		}
+		return this;
+	}
+
+	/**
+	 * Sets the parallelism for this sink. The degree must be higher than zero.
+	 *
+	 * @param parallelism The parallelism for this sink.
+	 * @return The sink with set parallelism.
+	 */
+	public CassandraSink<IN> setParallelism(int parallelism) {
+		if (useDataStreamSink) {
+			getSinkTransformation().setParallelism(parallelism);
+		} else {
+			getStreamTransformation().setParallelism(parallelism);
+		}
+		return this;
+	}
+
+	/**
+	 * Turns off chaining for this operator so thread co-location will not be
+	 * used as an optimization.
+	 * <p/>
+	 * <p/>
+	 * Chaining can be turned off for the whole
+	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
+	 * however it is not advised for performance considerations.
+	 *
+	 * @return The sink with chaining disabled
+	 */
+	public CassandraSink<IN> disableChaining() {
+		if (useDataStreamSink) {
+			getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+		} else {
+			getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+		}
+		return this;
+	}
+
+	/**
+	 * Sets the slot sharing group of this operation. Parallel instances of
+	 * operations that are in the same slot sharing group will be co-located in the same
+	 * TaskManager slot, if possible.
+	 * <p/>
+	 * <p>Operations inherit the slot sharing group of input operations if all input operations
+	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
+	 * <p/>
+	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
+	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
+	 *
+	 * @param slotSharingGroup The slot sharing group name.
+	 */
+	public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
+		if (useDataStreamSink) {
+			getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
+		} else {
+			getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
+		}
+		return this;
+	}
+
+	/**
+	 * Writes a DataStream into a Cassandra database.
+	 *
+	 * @param input input DataStream
+	 * @param <IN>  input type
+	 * @return CassandraSinkBuilder, to further configure the sink
+	 */
+	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
+		if (input.getType() instanceof TupleTypeInfo) {
+			DataStream<T> tupleInput = (DataStream<T>) input;
+			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
+		} else {
+			throw new IllegalArgumentException("POJOs are currently not supported.");
+		}
+	}
+
+	public enum ConsistencyLevel {
+		At_LEAST_ONCE,
+		EXACTLY_ONCE
+	}
+
+	public abstract static class CassandraSinkBuilder<IN> {
+		protected final DataStream<IN> input;
+		protected final TypeSerializer<IN> serializer;
+		protected final TypeInformation<IN> typeInfo;
+		protected ConsistencyLevel consistency = ConsistencyLevel.At_LEAST_ONCE;
+		protected ClusterBuilder builder;
+		protected String query;
+		protected CheckpointCommitter committer;
+
+		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			this.input = input;
+			this.typeInfo = typeInfo;
+			this.serializer = serializer;
+		}
+
+		/**
+		 * Sets the query that is to be executed for every record.
+		 * This parameter is mandatory.
+		 *
+		 * @param query query to use
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setQuery(String query) {
+			this.query = query;
+			return this;
+		}
+
+		public CassandraSinkBuilder<IN> setHost(String host) {
+			return setHost(host, 9042);
+		}
+
+		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
+			builder = new ClusterBuilder() {
+				@Override
+				protected Cluster buildCluster(Cluster.Builder builder) {
+					return builder.addContactPoint(host).withPort(port).build();
+				}
+			};
+			return this;
+		}
+
+		/**
+		 * Specifies the desired consistency level for this sink. Different sink implementations may be used depending
+		 * on this parameter.
+		 * This parameter is mandatory.
+		 *
+		 * @param consistency desired consistency level
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setConsistencyLevel(ConsistencyLevel consistency) {
+			this.consistency = consistency;
+			return this;
+		}
+
+		/**
+		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
+		 * This field is mandatory.
+		 *
+		 * @param builder ClusterBuilder to configure the connection to cassandra
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
+			this.builder = builder;
+			return this;
+		}
+
+		/**
+		 * Sets the CheckpointCommitter for this sink. A CheckpointCommitter stores information about completed checkpoints
+		 * in a resource outside of Flink.
+		 * If the desired consistency level is EXACTLY_ONCE and this field is not set, a default committer will be used.
+		 *
+		 * @param committer
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setCheckpointCommitter(CheckpointCommitter committer) {
+			this.committer = committer;
+			return this;
+		}
+
+		/**
+		 * Finalizes the configuration of this sink.
+		 *
+		 * @return finalized sink
+		 * @throws Exception
+		 */
+		public abstract CassandraSink<IN> build() throws Exception;
+	}
+
+	public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
+		public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override
+		public CassandraSink<IN> build() throws Exception {
+			if (consistency == ConsistencyLevel.EXACTLY_ONCE) {
+				return committer == null
+					? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, new CassandraCommitter(builder))))
+					: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, committer)));
+			} else {
+				throw new IllegalArgumentException("There is currently no dedicated support for at-least-once guarantees.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
new file mode 100644
index 0000000..d3db435
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+
+import java.io.Serializable;
+
+public abstract class ClusterBuilder implements Serializable {
+
+	public Cluster getCluster() {
+		return buildCluster(Cluster.builder());
+	}
+
+	protected abstract Cluster buildCluster(Cluster.Builder builder);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
new file mode 100644
index 0000000..bb7c0fb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.runtime.operators.AtLeastOnceSinkTestBase;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Scanner;
+import java.util.UUID;
+
+public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<String, Integer, Integer>, CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>>> {
+	private static File tmpDir;
+
+	private static final boolean EMBEDDED = true;
+	private static EmbeddedCassandraService cassandra;
+	private static Cluster cluster;
+	private static Session session;
+
+	private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+	private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
+	private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+	private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+	private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
+	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
+
+	private static class EmbeddedCassandraService {
+		CassandraDaemon cassandraDaemon;
+
+		public void start() throws IOException {
+			this.cassandraDaemon = new CassandraDaemon();
+			this.cassandraDaemon.init(null);
+			this.cassandraDaemon.start();
+		}
+
+		public void stop() {
+			this.cassandraDaemon.stop();
+		}
+	}
+
+	@BeforeClass
+	public static void startCassandra() throws IOException {
+		//generate temporary files
+		tmpDir = CommonTestUtils.createTempDirectory();
+		ClassLoader classLoader = CassandraIdempotentExactlyOnceSink.class.getClassLoader();
+		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
+		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
+		tmp.createNewFile();
+		BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
+		Scanner scanner = new Scanner(file);
+		while (scanner.hasNextLine()) {
+			String line = scanner.nextLine();
+			line = line.replace("$PATH", "'" + tmp.getParentFile());
+			b.write(line + "\n");
+			b.flush();
+		}
+		scanner.close();
+
+
+		// Tell cassandra where the configuration files are.
+		// Use the test configuration file.
+		System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
+
+		if (EMBEDDED) {
+			cassandra = new EmbeddedCassandraService();
+			cassandra.start();
+		}
+
+		cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
+		session = cluster.connect();
+
+		session.execute(CREATE_KEYSPACE_QUERY);
+		session.execute(CREATE_TABLE_QUERY);
+	}
+
+	@After
+	public void deleteSchema() throws Exception {
+		session.executeAsync(CLEAR_TABLE_QUERY);
+	}
+
+	@AfterClass
+	public static void closeCassandra() {
+		session.executeAsync(DROP_KEYSPACE_QUERY);
+		session.close();
+		cluster.close();
+		if (EMBEDDED) {
+			cassandra.stop();
+		}
+		tmpDir.delete();
+	}
+
+	@Override
+	protected CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> createSink() {
+		try {
+			return new CassandraIdempotentExactlyOnceSink<>(
+				INSERT_DATA_QUERY,
+				TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
+				new ClusterBuilder() {
+					@Override
+					protected Cluster buildCluster(Cluster.Builder builder) {
+						return builder.addContactPoint("127.0.0.1").build();
+					}
+				},
+				"testJob",
+				new CassandraCommitter(new ClusterBuilder() {
+					@Override
+					protected Cluster buildCluster(Cluster.Builder builder) {
+						return builder.addContactPoint("127.0.0.1").build();
+					}
+				}));
+		} catch (Exception e) {
+			Assert.fail("Failure while initializing sink: " + e.getMessage());
+			return null;
+		}
+	}
+
+	@Override
+	protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
+		return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
+	}
+
+	@Override
+	protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
+		return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
+	}
+
+	@Override
+	protected void verifyResultsIdealCircumstances(
+		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+		CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Row s : result) {
+			list.remove(new Integer(s.getInt("counter")));
+		}
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+	}
+
+	@Override
+	protected void verifyResultsDataPersistenceUponMissedNotify(
+		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+		CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Row s : result) {
+			list.remove(new Integer(s.getInt("counter")));
+		}
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+	}
+
+	@Override
+	protected void verifyResultsDataDiscardingUponRestore(
+		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+		CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 20; x++) {
+			list.add(x);
+		}
+		for (int x = 41; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Row s : result) {
+			list.remove(new Integer(s.getInt("counter")));
+		}
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
new file mode 100644
index 0000000..5a0b299
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.UUID;
+
+public class CassandraIdempotentExactlyOnceSinkExample {
+	public static void main(String[] args) throws Exception {
+
+		class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
+			private int counter = 0;
+			private boolean stop = false;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				while (!stop) {
+					Thread.sleep(50);
+					ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1));
+					counter++;
+					if (counter == 100) {
+						stop = true;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				stop = true;
+			}
+
+			@Override
+			public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+				return counter;
+			}
+
+			@Override
+			public void restoreState(Integer state) throws Exception {
+				this.counter = state;
+			}
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(1000);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
+		env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
+
+		CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
+			.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+			.setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
+			.setClusterBuilder(new ClusterBuilder() {
+				@Override
+				public Cluster buildCluster(Cluster.Builder builder) {
+					return builder.addContactPoint("127.0.0.1").build();
+				}
+			})
+			.build();
+
+		sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
new file mode 100644
index 0000000..32c18ea
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
@@ -0,0 +1,43 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 10000
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
+commitlog_directory: $PATH\commit'
+data_file_directories:
+    - $PATH\data'
+saved_caches_directory: $PATH\cache'
+listen_address: '127.0.0.1'
+seed_provider:
+    - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider'
+      parameters:
+          - seeds: '127.0.0.1'
+native_transport_port: 9042
+
+concurrent_reads: 8
+concurrent_writes: 8
+
+auto_bootstrap: false
+auto_snapshot: false
+
+start_rpc: false
+start_native_transport: true
+native_transport_max_threads: 8

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6d70ebd
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml
index 2bfd9b2..91691cf 100644
--- a/flink-streaming-connectors/pom.xml
+++ b/flink-streaming-connectors/pom.xml
@@ -45,6 +45,7 @@ under the License.
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>
+		<module>flink-connector-cassandra</module>
 	</modules>
 
 	<!-- See main pom.xml for explanation of profiles -->

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
new file mode 100644
index 0000000..22e629e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * This class is used to save information about which sink operator instance has committed checkpoints to a backend.
+ * <p/>
+ * The current checkpointing mechanism is ill-suited for sinks relying on backends that do not support roll-backs.
+ * When dealing with such a system, while trying to get exactly-once semantics, one may neither commit data while
+ * creating the snapshot (since another sink instance may fail, leading to a replay on the same data) nor when receiving
+ * a checkpoint-complete notification (since a subsequent failure would leave us with no knowledge as to whether data
+ * was committed or not).
+ * <p/>
+ * A CheckpointCommitter can be used to solve the second problem by saving whether an instance committed all data
+ * belonging to a checkpoint. This data must be stored in a backend that is persistent across retries (which rules
+ * out Flink's state mechanism) and accessible from all machines, like a database or distributed file.
+ * <p/>
+ * There is no mandate as to how the resource is shared; there may be one resource for all Flink jobs, or one for
+ * each job/operator/-instance separately. This implies that the resource must not be cleaned up by the system itself,
+ * and as such should kept as small as possible.
+ */
+public abstract class CheckpointCommitter implements Serializable {
+	protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCommitter.class);
+	protected String jobId;
+	protected String operatorId;
+	protected int subtaskId;
+
+	/**
+	 * Internally used to set the job ID after instantiation.
+	 *
+	 * @param id
+	 * @throws Exception
+	 */
+	public void setJobId(String id) throws Exception {
+		this.jobId = id;
+	}
+
+	/**
+	 * Internally used to set the operator ID after instantiation.
+	 *
+	 * @param id
+	 * @throws Exception
+	 */
+	public void setOperatorId(String id) throws Exception {
+		this.operatorId = id;
+	}
+
+	/**
+	 * Internally used to set the operator subtask ID after instantiation.
+	 *
+	 * @param id
+	 * @throws Exception
+	 */
+	public void setOperatorSubtaskId(int id) throws Exception {
+		this.subtaskId = id;
+	}
+
+	/**
+	 * Opens/connects to the resource, and possibly creates it beforehand.
+	 *
+	 * @throws Exception
+	 */
+	public abstract void open() throws Exception;
+
+	/**
+	 * Closes the resource/connection to it. The resource should generally still exist after this call.
+	 *
+	 * @throws Exception
+	 */
+	public abstract void close() throws Exception;
+
+	/**
+	 * Creates/opens/connects to the resource that is used to store information. Called once directly after instantiation.
+	 * @throws Exception
+	 */
+	public abstract void createResource() throws Exception;
+
+	/**
+	 * Mark the given checkpoint as completed in the resource.
+	 *
+	 * @param checkpointID
+	 * @throws Exception
+	 */
+	public abstract void commitCheckpoint(long checkpointID) throws Exception;
+
+	/**
+	 * Checked the resource whether the given checkpoint was committed completely.
+	 *
+	 * @param checkpointID
+	 * @return true if the checkpoint was committed completely, false otherwise
+	 * @throws Exception
+	 */
+	public abstract boolean isCheckpointCommitted(long checkpointID) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java
new file mode 100644
index 0000000..eb4aec3
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
+ * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
+ * <p/>
+ * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+	protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
+	private final CheckpointCommitter committer;
+	private transient AbstractStateBackend.CheckpointStateOutputView out;
+	protected final TypeSerializer<IN> serializer;
+	private final String id;
+
+	private ExactlyOnceState state = new ExactlyOnceState();
+
+	public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
+		this.committer = committer;
+		this.serializer = serializer;
+		this.id = UUID.randomUUID().toString();
+		this.committer.setJobId(jobID);
+		this.committer.createResource();
+	}
+
+	@Override
+	public void open() throws Exception {
+		committer.setOperatorId(id);
+		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+		committer.open();
+	}
+
+	public void close() throws Exception {
+		committer.close();
+	}
+
+	/**
+	 * Saves a handle in the state.
+	 *
+	 * @param checkpointId
+	 * @throws IOException
+	 */
+	private void saveHandleInState(final long checkpointId, final long timestamp) throws IOException {
+		synchronized (state) {
+			//only add handle if a new OperatorState was created since the last snapshot
+			if (out != null) {
+				StateHandle<DataInputView> handle = out.closeAndGetHandle();
+				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
+				out = null;
+			}
+		}
+	}
+
+	@Override
+	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
+		synchronized (state) {
+			StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+			saveHandleInState(checkpointId, timestamp);
+
+			taskState.setFunctionState(state);
+			return taskState;
+		}
+	}
+
+	@Override
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+		this.state = (ExactlyOnceState) state.getFunctionState();
+		out = null;
+	}
+
+	@Override
+	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+		super.notifyOfCompletedCheckpoint(checkpointId);
+
+		synchronized (state.pendingHandles) {
+			Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
+			Set<Long> checkpointsToRemove = new HashSet<>();
+			for (Long pastCheckpointId : pastCheckpointIds) {
+				if (pastCheckpointId <= checkpointId) {
+					if (!committer.isCheckpointCommitted(pastCheckpointId)) {
+						Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
+						DataInputView in = handle.f1.getState(getUserCodeClassloader());
+						sendValue(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
+						committer.commitCheckpoint(pastCheckpointId);
+					}
+					checkpointsToRemove.add(pastCheckpointId);
+				}
+			}
+			for (Long toRemove : checkpointsToRemove) {
+				Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(toRemove);
+				state.pendingHandles.remove(toRemove);
+				handle.f1.discardState();
+			}
+		}
+	}
+
+
+	/**
+	 * Write the given element into the backend.
+	 *
+	 * @param value value to be written
+	 * @throws Exception
+	 */
+	protected abstract void sendValue(Iterable<IN> value, long timestamp) throws Exception;
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		IN value = element.getValue();
+		//generate initial operator state
+		if (out == null) {
+			out = getStateBackend().createCheckpointStateOutputView(0, 0);
+		}
+		serializer.serialize(value, out);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		//don't do anything, since we are a sink
+	}
+
+	/**
+	 * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were
+	 * used since the last completed checkpoint.
+	 **/
+	public class ExactlyOnceState implements StateHandle<Serializable> {
+		protected TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> pendingHandles;
+
+		public ExactlyOnceState() {
+			pendingHandles = new TreeMap<>();
+		}
+
+		@Override
+		public TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> getState(ClassLoader userCodeClassLoader) throws Exception {
+			return pendingHandles;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			for (Tuple2<Long, StateHandle<DataInputView>> pair : pendingHandles.values()) {
+				pair.f1.discardState();
+			}
+			pendingHandles = new TreeMap<>();
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			int stateSize = 0;
+			for (Tuple2<Long, StateHandle<DataInputView>> pair : pendingHandles.values()) {
+				stateSize += pair.f1.getStateSize();
+			}
+			return stateSize;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java
new file mode 100644
index 0000000..1f2fa54
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+public abstract class AtLeastOnceSinkTestBase<IN, S extends GenericAtLeastOnceSink<IN>> {
+
+	protected class OperatorExposingTask<IN> extends OneInputStreamTask<IN, IN> {
+		public OneInputStreamOperator<IN, IN> getOperator() {
+			return this.headOperator;
+		}
+	}
+
+	protected OperatorExposingTask<IN> createTask() {
+		return new OperatorExposingTask<>();
+	}
+
+	protected abstract S createSink() throws Exception;
+
+	protected abstract TypeInformation<IN> createTypeInfo();
+
+	protected abstract IN generateValue(int counter, int checkpointID);
+
+	protected abstract void verifyResultsIdealCircumstances(
+		OneInputStreamTaskTestHarness<IN, IN> harness, OneInputStreamTask<IN, IN> task, S sink) throws Exception;
+
+	protected abstract void verifyResultsDataPersistenceUponMissedNotify(
+		OneInputStreamTaskTestHarness<IN, IN> harness, OneInputStreamTask<IN, IN> task, S sink) throws Exception;
+
+	protected abstract void verifyResultsDataDiscardingUponRestore(
+		OneInputStreamTaskTestHarness<IN, IN> harness, OneInputStreamTask<IN, IN> task, S sink) throws Exception;
+
+	@Test
+	public void testIdealCircumstances() throws Exception {
+		OperatorExposingTask<IN> task = createTask();
+		TypeInformation<IN> info = createTypeInfo();
+		OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, info, info);
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(createSink());
+
+		int elementCounter = 1;
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		ArrayList<StreamTaskState> states = new ArrayList<>();
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		testHarness.endInput();
+
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		testHarness.waitForTaskCompletion();
+
+		verifyResultsIdealCircumstances(testHarness, task, (S) task.getOperator());
+	}
+
+	@Test
+	public void testDataPersistenceUponMissedNotify() throws Exception {
+		S sink = createSink();
+		OperatorExposingTask<IN> task = createTask();
+		TypeInformation<IN> info = createTypeInfo();
+		OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, info, info);
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(sink);
+
+		int elementCounter = 1;
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		ArrayList<StreamTaskState> states = new ArrayList<>();
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		testHarness.endInput();
+
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		testHarness.waitForTaskCompletion();
+
+		verifyResultsDataPersistenceUponMissedNotify(testHarness, task, (S) task.getOperator());
+	}
+
+	@Test
+	public void testDataDiscardingUponRestore() throws Exception {
+		S sink = createSink();
+		OperatorExposingTask<IN> task = createTask();
+		TypeInformation<IN> info = createTypeInfo();
+		OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, info, info);
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(sink);
+
+		int elementCounter = 1;
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		ArrayList<StreamTaskState> states = new ArrayList<>();
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+		task.getOperator().restoreState(states.get(states.size() - 1), 0);
+
+		for (int x = 0; x < 20; x++) {
+			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
+			elementCounter++;
+		}
+		testHarness.waitForInputProcessing();
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		task.notifyCheckpointComplete(states.size() - 1);
+
+		testHarness.endInput();
+
+		states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+		testHarness.waitForTaskCompletion();
+
+		verifyResultsDataDiscardingUponRestore(testHarness, task, (S) task.getOperator());
+	}
+
+	private StreamTaskState copyTaskState(StreamTaskState toCopy) throws IOException, ClassNotFoundException {
+		synchronized (toCopy.getFunctionState()) {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream oos = new ObjectOutputStream(baos);
+			oos.writeObject(toCopy);
+
+			ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+			ObjectInputStream ois = new ObjectInputStream(bais);
+			return (StreamTaskState) ois.readObject();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java
new file mode 100644
index 0000000..72c71ad
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GenericAtLeastOnceSinkTest extends AtLeastOnceSinkTestBase<Tuple1<Integer>, GenericAtLeastOnceSinkTest.ListSink> {
+	@Override
+	protected ListSink createSink() throws Exception {
+		return new ListSink();
+	}
+
+	@Override
+	protected TupleTypeInfo<Tuple1<Integer>> createTypeInfo() {
+		return TupleTypeInfo.getBasicTupleTypeInfo(Integer.class);
+	}
+
+	@Override
+	protected Tuple1<Integer> generateValue(int counter, int checkpointID) {
+		return new Tuple1<>(counter);
+	}
+
+	@Override
+	protected void verifyResultsIdealCircumstances(
+		OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
+		OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
+
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Integer i : sink.values) {
+			list.remove(i);
+		}
+		Assert.assertTrue("The following ID's where not found in the result list: " + list.toString(), list.isEmpty());
+		Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 60), sink.values.size() == 60);
+	}
+
+	@Override
+	protected void verifyResultsDataPersistenceUponMissedNotify(
+		OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
+		OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
+
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Integer i : sink.values) {
+			list.remove(i);
+		}
+		Assert.assertTrue("The following ID's where not found in the result list: " + list.toString(), list.isEmpty());
+		Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 60), sink.values.size() == 60);
+	}
+
+	@Override
+	protected void verifyResultsDataDiscardingUponRestore(
+		OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
+		OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
+
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int x = 1; x <= 20; x++) {
+			list.add(x);
+		}
+		for (int x = 41; x <= 60; x++) {
+			list.add(x);
+		}
+
+		for (Integer i : sink.values) {
+			list.remove(i);
+		}
+		Assert.assertTrue("The following ID's where not found in the result list: " + list.toString(), list.isEmpty());
+		Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 40), sink.values.size() == 40);
+	}
+
+	/**
+	 * Simple sink that stores all records in a public list.
+	 */
+	public static class ListSink extends GenericAtLeastOnceSink<Tuple1<Integer>> {
+		public List<Integer> values = new ArrayList<>();
+
+		public ListSink() throws Exception {
+			super(new SimpleCommitter(), TypeExtractor.getForObject(new Tuple1<>(1)).createSerializer(new ExecutionConfig()), "job");
+		}
+
+		@Override
+		protected void sendValue(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
+			for (Tuple1<Integer> value : values) {
+				this.values.add(value.f0);
+			}
+		}
+	}
+
+	public static class SimpleCommitter extends CheckpointCommitter {
+		private List<Long> checkpoints;
+
+		@Override
+		public void open() throws Exception {
+			checkpoints = new ArrayList<>();
+		}
+
+		@Override
+		public void close() throws Exception {
+			checkpoints.clear();
+			checkpoints = null;
+		}
+
+		@Override
+		public void createResource() throws Exception {
+		}
+
+		@Override
+		public void commitCheckpoint(long checkpointID) {
+			checkpoints.add(checkpointID);
+		}
+
+		@Override
+		public boolean isCheckpointCommitted(long checkpointID) {
+			return checkpoints.contains(checkpointID);
+		}
+	}
+}