You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/16 14:58:31 UTC

[2/4] flink git commit: [FLINK-3332] Address pull request review comments

[FLINK-3332] Address pull request review comments

This closes #1771


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b63e19b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b63e19b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b63e19b0

Branch: refs/heads/master
Commit: b63e19b090cd9a1558db4c68c385e0ce947789e9
Parents: 5930622
Author: zentol <ch...@apache.org>
Authored: Wed Mar 9 18:20:43 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 16 16:58:04 2016 +0200

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 docs/apis/streaming/connectors/cassandra.md     |  30 ++-
 .../api/writer/ResultPartitionWriter.java       |   2 +-
 .../flink-connector-cassandra/pom.xml           |   3 +-
 .../cassandra/CassandraInputFormat.java         |   4 +-
 .../cassandra/CassandraOutputFormat.java        |   4 +-
 .../cassandra/CassandraAtLeastOnceSink.java     |  91 --------
 .../cassandra/CassandraCommitter.java           |  34 +--
 .../CassandraIdempotentExactlyOnceSink.java     | 116 ----------
 .../cassandra/CassandraPojoAtLeastOnceSink.java |  61 -----
 .../connectors/cassandra/CassandraPojoSink.java |  61 +++++
 .../connectors/cassandra/CassandraSink.java     |  89 ++++---
 .../connectors/cassandra/CassandraSinkBase.java |  94 ++++++++
 .../CassandraTupleAtLeastOnceSink.java          |  59 -----
 .../cassandra/CassandraTupleSink.java           |  59 +++++
 .../cassandra/CassandraTupleWriteAheadSink.java | 138 +++++++++++
 .../connectors/cassandra/ClusterBuilder.java    |  12 +
 .../cassandra/example/BatchExample.java         |   6 +
 .../cassandra/CassandraConnectorTest.java       | 128 +++++++---
 ...ssandraIdempotentExactlyOnceSinkExample.java |  88 -------
 .../CassandraPojoAtLeastOnceSinkExample.java    |  57 -----
 .../example/CassandraPojoSinkExample.java       |  62 +++++
 .../CassandraTupleAtLeastOnceSinkExample.java   |  59 -----
 .../example/CassandraTupleSinkExample.java      |  62 +++++
 .../CassandraTupleWriteAheadSinkExample.java    |  96 ++++++++
 .../src/test/resources/cassandra.yaml           |   6 +-
 .../src/test/resources/log4j-test.properties    |   4 +-
 .../operators/GenericAtLeastOnceSink.java       | 197 ----------------
 .../operators/GenericWriteAheadSink.java        | 216 +++++++++++++++++
 .../operators/AtLeastOnceSinkTestBase.java      | 218 -----------------
 .../operators/GenericAtLeastOnceSinkTest.java   | 147 ------------
 .../operators/GenericWriteAheadSinkTest.java    | 153 ++++++++++++
 .../operators/WriteAheadSinkTestBase.java       | 231 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |   7 +
 34 files changed, 1403 insertions(+), 1193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index f6fdb78..57f4a52 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,7 +25,7 @@ matrix:
     - jdk: "oraclejdk7" # this uploads the Hadoop 2 build to Maven and S3
       env: PROFILE="-Dhadoop.version=2.3.0"
     - jdk: "openjdk7" # this uploads the Hadoop 1 build to Maven and S3
-      env: PROFILE="-Dhadoop.profile=1"
+      env: PROFILE="-Dhadoop.profile=1" 
 
 
 git:

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/docs/apis/streaming/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/cassandra.md b/docs/apis/streaming/connectors/cassandra.md
index 2497a8e..18732fb 100644
--- a/docs/apis/streaming/connectors/cassandra.md
+++ b/docs/apis/streaming/connectors/cassandra.md
@@ -51,22 +51,34 @@ This method returns a CassandraSinkBuilder, which offers methods to further conf
 The following configuration methods can be used:
 
 1. setQuery(String query)
-2. setConsistencyLevel(ConsistencyLevel level)
+2. setHost(String host[, int port])
 3. setClusterBuilder(ClusterBuilder builder)
-4. setCheckpointCommitter(CheckpointCommitter committer)
+4. enableWriteAheadLog([CheckpointCommitter committer])
 5. build()
 
 setQuery() sets the query that is executed for every value the sink receives.
-setConsistencyLevel() sets the desired consistency level (AT_LEAST_ONCE / EXACTLY_ONCE).
-setClusterBuilder() sets the cluster builder that is used to configure the connection to cassandra.
-setCheckpointCommitter() is an optional method for EXACTLY_ONCE processing.
+setHost() sets the cassandra host/port to connect to. This method is intended for simple use-cases.
+setClusterBuilder() sets the cluster builder that is used to configure the connection to cassandra. The setHost() functionality can be subsumed with this method.
+enableWriteAheadLog() is an optional method, that allows exactly-once processing for non-deterministic algorithms.
 
 A checkpoint committer stores additional information about completed checkpoints
-in some resource. You can use a `CassandraCommitter` to store these in a separate
-table in cassandra. Note that this table will NOT be cleaned up by Flink.
+in some resource. This information is used to prevent a full replay of the last
+completed checkpoint in case of a failure.
+You can use a `CassandraCommitter` to store these in a separate table in cassandra.
+Note that this table will NOT be cleaned up by Flink.
 
 build() finalizes the configuration and returns the CassandraSink.
 
+Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
+times without changing the result) and checkpointing is enabled. In case of a failure the failed
+checkpoint will be replayed completely.
+
+Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
+the replayed checkpoint may be completely different than the previous attempt, which may leave the
+database in an inconsitent state since part of the first attempt may already be written.
+The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. 
+Note that that enabling this feature will have an adverse impact on latency.
+
 Example:
 
 <div class="codetabs" markdown="1">
@@ -74,8 +86,6 @@ Example:
 {% highlight java %}
 CassandraSink.addSink(input)
   .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
-  .setCheckpointCommitter(new CassandraCommitter())
   .setClusterBuilder(new ClusterBuilder() {
     @Override
     public Cluster buildCluster(Cluster.Builder builder) {
@@ -89,8 +99,6 @@ CassandraSink.addSink(input)
 {% highlight scala %}
 CassandraSink.addSink(input)
   .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
-  .setCheckpointCommitter(new CassandraCommitter())
   .setClusterBuilder(new ClusterBuilder() {
     @Override
     public Cluster buildCluster(Cluster.Builder builder) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index cfab34d..79c21c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
  * The {@link ResultPartitionWriter} is the runtime API for producing results. It
  * supports two kinds of data to be sent: buffers and events.
  */
-public class ResultPartitionWriter implements EventListener<TaskEvent> {
+public final class ResultPartitionWriter implements EventListener<TaskEvent> {
 
 	private final ResultPartition partition;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
index 9d8b24d..bfd712b 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
@@ -49,6 +49,7 @@ under the License.
 				<configuration>
 					<reuseForks>true</reuseForks>
 					<forkCount>1</forkCount>
+					<argLine>-Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -126,7 +127,7 @@ under the License.
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
-			<version>16.0.1</version>
+			<version>${guava.version}</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 0c027dc..6818288 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -117,13 +117,13 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
 		try {
 			session.close();
 		} catch (Exception e) {
-			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+			LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
 		}
 
 		try {
 			cluster.close();
 		} catch (Exception e) {
-			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+			LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 433cfba..116db89 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -111,13 +111,13 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
 		try {
 			session.close();
 		} catch (Exception e) {
-			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+			LOG.warn("Inputformat couldn't be closed.", e);
 		}
 
 		try {
 			cluster.close();
 		} catch (Exception e) {
-			LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+			LOG.warn("Inputformat couldn't be closed." , e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
deleted file mode 100644
index 602ec74..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * CassandraAtLeastOnceSink is the common abstract class of {@link CassandraPojoAtLeastOnceSink} and {@link CassandraTupleAtLeastOnceSink}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class CassandraAtLeastOnceSink<IN, V> extends RichSinkFunction<IN> {
-	protected static final Logger LOG = LoggerFactory.getLogger(CassandraAtLeastOnceSink.class);
-	protected transient Cluster cluster;
-	protected transient Session session;
-
-	protected transient Throwable exception = null;
-	protected transient FutureCallback<V> callback;
-
-	private final ClusterBuilder builder;
-
-	protected CassandraAtLeastOnceSink(ClusterBuilder builder) {
-		this.builder = builder;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		this.callback = new FutureCallback<V>() {
-			@Override
-			public void onSuccess(V ignored) {
-			}
-
-			@Override
-			public void onFailure(Throwable t) {
-				exception = t;
-			}
-		};
-		this.cluster = builder.getCluster();
-		this.session = cluster.connect();
-	}
-
-	@Override
-	public void invoke(IN value) throws Exception {
-		if (exception != null) {
-			throw new IOException("invoke() failed", exception);
-		}
-		ListenableFuture<V> result = send(value);
-		Futures.addCallback(result, callback);
-	}
-
-	public abstract ListenableFuture<V> send(IN value);
-
-	@Override
-	public void close() {
-		try {
-			session.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			cluster.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index c5d47e7..caab188 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -34,18 +34,24 @@ public class CassandraCommitter extends CheckpointCommitter {
 	private transient Cluster cluster;
 	private transient Session session;
 
-	private static final String KEYSPACE = "flink_auxiliary";
-	private String TABLE = "checkpoints_";
+	private String keySpace = "flink_auxiliary";
+	private String table = "checkpoints_";
 
-	private transient PreparedStatement deleteStatement;
 	private transient PreparedStatement updateStatement;
 	private transient PreparedStatement selectStatement;
 
+	private long lastCommittedCheckpointID = -1;
+
 	public CassandraCommitter(ClusterBuilder builder) {
 		this.builder = builder;
 		ClosureCleaner.clean(builder, true);
 	}
 
+	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+		this(builder);
+		this.keySpace = keySpace;
+	}
+
 	/**
 	 * Internally used to set the job ID after instantiation.
 	 *
@@ -54,7 +60,7 @@ public class CassandraCommitter extends CheckpointCommitter {
 	 */
 	public void setJobId(String id) throws Exception {
 		super.setJobId(id);
-		TABLE += id;
+		table += id;
 	}
 
 	/**
@@ -68,8 +74,8 @@ public class CassandraCommitter extends CheckpointCommitter {
 		cluster = builder.getCluster();
 		session = cluster.connect();
 
-		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
-		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
+		session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", keySpace));
+		session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
 
 		try {
 			session.close();
@@ -91,16 +97,15 @@ public class CassandraCommitter extends CheckpointCommitter {
 		cluster = builder.getCluster();
 		session = cluster.connect();
 
-		deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
-		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
-		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+		updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
+		selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
 
-		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", KEYSPACE, TABLE, operatorId, subtaskId));
+		session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ") IF NOT EXISTS;", keySpace, table, operatorId, subtaskId));
 	}
 
 	@Override
 	public void close() throws Exception {
-		session.executeAsync(deleteStatement.bind());
+		this.lastCommittedCheckpointID = -1;
 		try {
 			session.close();
 		} catch (Exception e) {
@@ -116,11 +121,14 @@ public class CassandraCommitter extends CheckpointCommitter {
 	@Override
 	public void commitCheckpoint(long checkpointID) {
 		session.execute(updateStatement.bind(checkpointID));
+		this.lastCommittedCheckpointID = checkpointID;
 	}
 
 	@Override
 	public boolean isCheckpointCommitted(long checkpointID) {
-		long lastId = session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
-		return checkpointID <= lastId;
+		if (this.lastCommittedCheckpointID == -1) {
+			this.lastCommittedCheckpointID = session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
+		}
+		return checkpointID <= this.lastCommittedCheckpointID;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
deleted file mode 100644
index b9de9c4..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
-
-/**
- * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing
- * mechanism and provides exactly-once guarantees for idempotent updates.
- * <p/>
- * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
- * checkpoint is completed.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraIdempotentExactlyOnceSink<IN extends Tuple> extends GenericAtLeastOnceSink<IN> {
-	protected transient Cluster cluster;
-	protected transient Session session;
-
-	private final String insertQuery;
-	private transient PreparedStatement preparedStatement;
-
-	private transient Throwable exception = null;
-	private transient FutureCallback<ResultSet> callback;
-
-	private ClusterBuilder builder;
-
-	protected CassandraIdempotentExactlyOnceSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception {
-		super(committer, serializer, jobID);
-		this.insertQuery = insertQuery;
-		this.builder = builder;
-		ClosureCleaner.clean(builder, true);
-	}
-
-	public void open() throws Exception {
-		super.open();
-		this.callback = new FutureCallback<ResultSet>() {
-			@Override
-			public void onSuccess(ResultSet resultSet) {
-			}
-
-			@Override
-			public void onFailure(Throwable throwable) {
-				exception = throwable;
-			}
-		};
-		cluster = builder.getCluster();
-		session = cluster.connect();
-		preparedStatement = session.prepare(insertQuery);
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		try {
-			session.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			cluster.close();
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
-		}
-	}
-
-	@Override
-	protected void sendValue(Iterable<IN> values, long timestamp) throws Exception {
-		//verify that no query failed until now
-		if (exception != null) {
-			throw new Exception(exception);
-		}
-		//set values for prepared statement
-		for (IN value : values) {
-			Object[] fields = new Object[value.getArity()];
-			for (int x = 0; x < value.getArity(); x++) {
-				fields[x] = value.getField(x);
-			}
-			//insert values and send to cassandra
-			BoundStatement s = preparedStatement.bind(fields);
-			s.setDefaultTimestamp(timestamp);
-			ResultSetFuture result = session.executeAsync(s);
-			if (result != null) {
-				//add callback to detect errors
-				Futures.addCallback(result, callback);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
deleted file mode 100644
index 45ac61d..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
- * it uses annotations from {@link com.datastax.driver.mapping}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraPojoAtLeastOnceSink<IN> extends CassandraAtLeastOnceSink<IN, Void> {
-	protected Class<IN> clazz;
-	protected transient Mapper<IN> mapper;
-	protected transient MappingManager mappingManager;
-
-	/**
-	 * The main constructor for creating CassandraPojoAtLeastOnceSink
-	 *
-	 * @param clazz Class<IN> instance
-	 */
-	public CassandraPojoAtLeastOnceSink(Class<IN> clazz, ClusterBuilder builder) {
-		super(builder);
-		this.clazz = clazz;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		try {
-			this.mappingManager = new MappingManager(session);
-			this.mapper = mappingManager.mapper(clazz);
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot create CassandraPojoAtLeastOnceSink with input: " + clazz.getSimpleName(), e);
-		}
-	}
-
-	@Override
-	public ListenableFuture<Void> send(IN value) {
-		return mapper.saveAsync(value);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
new file mode 100644
index 0000000..204a0f3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
+ * it uses annotations from {@link com.datastax.driver.mapping}.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+	protected Class<IN> clazz;
+	protected transient Mapper<IN> mapper;
+	protected transient MappingManager mappingManager;
+
+	/**
+	 * The main constructor for creating CassandraPojoSink
+	 *
+	 * @param clazz Class<IN> instance
+	 */
+	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
+		super(builder);
+		this.clazz = clazz;
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		super.open(configuration);
+		try {
+			this.mappingManager = new MappingManager(session);
+			this.mapper = mappingManager.mapper(clazz);
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e);
+		}
+	}
+
+	@Override
+	public ListenableFuture<Void> send(IN value) {
+		return mapper.saveAsync(value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index e7804f4..c01f392 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -30,15 +30,12 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 
-import java.util.UUID;
-
 /**
  * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
  *
  * @param <IN> input type
  */
 public class CassandraSink<IN> {
-	private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
 	private final boolean useDataStreamSink;
 	private DataStreamSink<IN> sink1;
 	private SingleOutputStreamOperator<IN> sink2;
@@ -170,19 +167,14 @@ public class CassandraSink<IN> {
 		}
 	}
 
-	public enum ConsistencyLevel {
-		At_LEAST_ONCE,
-		EXACTLY_ONCE
-	}
-
 	public abstract static class CassandraSinkBuilder<IN> {
 		protected final DataStream<IN> input;
 		protected final TypeSerializer<IN> serializer;
 		protected final TypeInformation<IN> typeInfo;
-		protected ConsistencyLevel consistency = ConsistencyLevel.At_LEAST_ONCE;
 		protected ClusterBuilder builder;
 		protected String query;
 		protected CheckpointCommitter committer;
+		protected boolean isWriteAheadLogEnabled;
 
 		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
 			this.input = input;
@@ -192,7 +184,6 @@ public class CassandraSink<IN> {
 
 		/**
 		 * Sets the query that is to be executed for every record.
-		 * This parameter is mandatory.
 		 *
 		 * @param query query to use
 		 * @return this builder
@@ -202,12 +193,28 @@ public class CassandraSink<IN> {
 			return this;
 		}
 
+		/**
+		 * Sets the cassandra host to connect to.
+		 *
+		 * @param host host to connect to
+		 * @return this builder
+		 */
 		public CassandraSinkBuilder<IN> setHost(String host) {
 			return setHost(host, 9042);
 		}
 
+		/**
+		 * Sets the cassandra host/port to connect to.
+		 *
+		 * @param host host to connect to
+		 * @param port port to connect to
+		 * @return this builder
+		 */
 		public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
-			builder = new ClusterBuilder() {
+			if (this.builder != null) {
+				throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
+			}
+			this.builder = new ClusterBuilder() {
 				@Override
 				protected Cluster buildCluster(Cluster.Builder builder) {
 					return builder.addContactPoint(host).withPort(port).build();
@@ -217,39 +224,40 @@ public class CassandraSink<IN> {
 		}
 
 		/**
-		 * Specifies the desired consistency level for this sink. Different sink implementations may be used depending
-		 * on this parameter.
-		 * This parameter is mandatory.
+		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
 		 *
-		 * @param consistency desired consistency level
+		 * @param builder ClusterBuilder to configure the connection to cassandra
 		 * @return this builder
 		 */
-		public CassandraSinkBuilder<IN> setConsistencyLevel(ConsistencyLevel consistency) {
-			this.consistency = consistency;
+		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
+			if (this.builder != null) {
+				throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
+			}
+			this.builder = builder;
 			return this;
 		}
 
 		/**
-		 * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
-		 * This field is mandatory.
+		 * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
+		 * idempotent updates.
 		 *
-		 * @param builder ClusterBuilder to configure the connection to cassandra
 		 * @return this builder
 		 */
-		public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
-			this.builder = builder;
+		public CassandraSinkBuilder<IN> enableWriteAheadLog() {
+			this.isWriteAheadLogEnabled = true;
 			return this;
 		}
 
 		/**
-		 * Sets the CheckpointCommitter for this sink. A CheckpointCommitter stores information about completed checkpoints
-		 * in a resource outside of Flink.
-		 * If the desired consistency level is EXACTLY_ONCE and this field is not set, a default committer will be used.
+		 * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
+		 * idempotent updates.
 		 *
-		 * @param committer
+		 * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external
+		 *                  resource. By default this information is stored within a separate table within Cassandra.
 		 * @return this builder
 		 */
-		public CassandraSinkBuilder<IN> setCheckpointCommitter(CheckpointCommitter committer) {
+		public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
+			this.isWriteAheadLogEnabled = true;
 			this.committer = committer;
 			return this;
 		}
@@ -261,6 +269,15 @@ public class CassandraSink<IN> {
 		 * @throws Exception
 		 */
 		public abstract CassandraSink<IN> build() throws Exception;
+
+		protected void sanityCheck() {
+			if (query == null || query.length() == 0) {
+				throw new IllegalArgumentException("Query must not be null or empty.");
+			}
+			if (builder == null) {
+				throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
+			}
+		}
 	}
 
 	public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
@@ -270,12 +287,13 @@ public class CassandraSink<IN> {
 
 		@Override
 		public CassandraSink<IN> build() throws Exception {
-			if (consistency == ConsistencyLevel.EXACTLY_ONCE) {
+			sanityCheck();
+			if (isWriteAheadLogEnabled) {
 				return committer == null
-					? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, new CassandraCommitter(builder))))
-					: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, committer)));
+					? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
+					: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
 			} else {
-				return new CassandraSink<>(input.addSink(new CassandraTupleAtLeastOnceSink<IN>(query, builder)).name("Cassandra Sink"));
+				return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
 			}
 		}
 	}
@@ -287,13 +305,12 @@ public class CassandraSink<IN> {
 
 		@Override
 		public CassandraSink<IN> build() throws Exception {
-			if (consistency == ConsistencyLevel.EXACTLY_ONCE) {
+			sanityCheck();
+			if (isWriteAheadLogEnabled) {
 				throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
+			} else {
+				return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
 			}
-			if (consistency == ConsistencyLevel.At_LEAST_ONCE) {
-				return new CassandraSink<>(input.addSink(new CassandraPojoAtLeastOnceSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
-			}
-			throw new IllegalArgumentException("No consistency level was specified.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
new file mode 100644
index 0000000..c823f5b
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
+	protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
+	protected transient Cluster cluster;
+	protected transient Session session;
+
+	protected transient Throwable exception = null;
+	protected transient FutureCallback<V> callback;
+
+	private final ClusterBuilder builder;
+
+	protected CassandraSinkBase(ClusterBuilder builder) {
+		this.builder = builder;
+		ClosureCleaner.clean(builder, true);
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		this.callback = new FutureCallback<V>() {
+			@Override
+			public void onSuccess(V ignored) {
+			}
+
+			@Override
+			public void onFailure(Throwable t) {
+				exception = t;
+				LOG.error("Error while sending value.", t);
+			}
+		};
+		this.cluster = builder.getCluster();
+		this.session = cluster.connect();
+	}
+
+	@Override
+	public void invoke(IN value) throws Exception {
+		if (exception != null) {
+			throw new IOException("invoke() failed", exception);
+		}
+		ListenableFuture<V> result = send(value);
+		Futures.addCallback(result, callback);
+	}
+
+	public abstract ListenableFuture<V> send(IN value);
+
+	@Override
+	public void close() {
+		try {
+			session.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing session.", e);
+		}
+		try {
+			cluster.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing cluster.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
deleted file mode 100644
index 9d6daea..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
- */
-public class CassandraTupleAtLeastOnceSink<IN extends Tuple> extends CassandraAtLeastOnceSink<IN, ResultSet> {
-	private final String insertQuery;
-	private transient PreparedStatement ps;
-
-	public CassandraTupleAtLeastOnceSink(String insertQuery, ClusterBuilder builder) {
-		super(builder);
-		this.insertQuery = insertQuery;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		this.ps = session.prepare(insertQuery);
-	}
-
-	@Override
-	public ListenableFuture<ResultSet> send(IN value) {
-		Object[] fields = extract(value);
-		return session.executeAsync(ps.bind(fields));
-	}
-
-	private Object[] extract(IN record) {
-		Object[] al = new Object[record.getArity()];
-		for (int i = 0; i < record.getArity(); i++) {
-			al[i] = record.getField(i);
-		}
-		return al;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
new file mode 100644
index 0000000..0a9ef06
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Flink Sink to save data into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
+ */
+public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
+	private final String insertQuery;
+	private transient PreparedStatement ps;
+
+	public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+		super(builder);
+		this.insertQuery = insertQuery;
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		super.open(configuration);
+		this.ps = session.prepare(insertQuery);
+	}
+
+	@Override
+	public ListenableFuture<ResultSet> send(IN value) {
+		Object[] fields = extract(value);
+		return session.executeAsync(ps.bind(fields));
+	}
+
+	private Object[] extract(IN record) {
+		Object[] al = new Object[record.getArity()];
+		for (int i = 0; i < record.getArity(); i++) {
+			al[i] = record.getField(i);
+		}
+		return al;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
new file mode 100644
index 0000000..f784647
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
+	protected transient Cluster cluster;
+	protected transient Session session;
+
+	private final String insertQuery;
+	private transient PreparedStatement preparedStatement;
+
+	private transient Throwable exception = null;
+	private transient FutureCallback<ResultSet> callback;
+
+	private ClusterBuilder builder;
+
+	private int updatesSent = 0;
+	private AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+	private transient Object[] fields;
+
+	protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
+		super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
+		this.insertQuery = insertQuery;
+		this.builder = builder;
+		ClosureCleaner.clean(builder, true);
+	}
+
+	public void open() throws Exception {
+		super.open();
+		if (!getRuntimeContext().isCheckpointingEnabled()) {
+			throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
+		}
+		this.callback = new FutureCallback<ResultSet>() {
+			@Override
+			public void onSuccess(ResultSet resultSet) {
+				updatesConfirmed.incrementAndGet();
+			}
+
+			@Override
+			public void onFailure(Throwable throwable) {
+				exception = throwable;
+				LOG.error("Error while sending value.", throwable);
+			}
+		};
+		cluster = builder.getCluster();
+		session = cluster.connect();
+		preparedStatement = session.prepare(insertQuery);
+
+		fields = new Object[((TupleSerializer<IN>) serializer).getArity()];
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		try {
+			session.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing session.", e);
+		}
+		try {
+			cluster.close();
+		} catch (Exception e) {
+			LOG.error("Error while closing cluster.", e);
+		}
+	}
+
+	@Override
+	protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
+		//verify that no query failed until now
+		if (exception != null) {
+			throw new Exception(exception);
+		}
+		//set values for prepared statement
+		for (IN value : values) {
+			for (int x = 0; x < value.getArity(); x++) {
+				fields[x] = value.getField(x);
+			}
+			//insert values and send to cassandra
+			BoundStatement s = preparedStatement.bind(fields);
+			s.setDefaultTimestamp(timestamp);
+			ResultSetFuture result = session.executeAsync(s);
+			updatesSent++;
+			if (result != null) {
+				//add callback to detect errors
+				Futures.addCallback(result, callback);
+			}
+		}
+		try {
+			while (updatesSent != updatesConfirmed.get()) {
+				Thread.sleep(100);
+			}
+		} catch (InterruptedException e) {
+		}
+		updatesSent = 0;
+		updatesConfirmed.set(0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
index d3db435..01458f4 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -21,11 +21,23 @@ import com.datastax.driver.core.Cluster;
 
 import java.io.Serializable;
 
+/**
+ * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment.
+ * The cluster represents the connection that will be established to Cassandra.
+ */
 public abstract class ClusterBuilder implements Serializable {
 
 	public Cluster getCluster() {
 		return buildCluster(Cluster.builder());
 	}
 
+	/**
+	 * Configures the connection to Cassandra.
+	 * The configuration is done by calling methods on the builder object
+	 * and finalizing the configuration with build().
+	 *
+	 * @param builder connection builder
+	 * @return configured connection
+	 */
 	protected abstract Cluster buildCluster(Cluster.Builder builder);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
index 396444a..e66b8b3 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
@@ -29,6 +29,12 @@ import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
 import java.util.ArrayList;
 
+/**
+ * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API.
+ * 
+ * The example assumes that a table exists in a local cassandra database, according to the following query: 
+ * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));
+ */
 public class BatchExample {
 	private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);";
 	private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;";

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index a4a23a1..83bb37a 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
@@ -26,25 +28,33 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.AtLeastOnceSinkTestBase;
+import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -54,7 +64,11 @@ import java.util.ArrayList;
 import java.util.Scanner;
 import java.util.UUID;
 
-public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<String, Integer, Integer>, CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>>> {
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore("javax.management.*")
+public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
 	private static File tmpDir;
 
 	private static final boolean EMBEDDED = true;
@@ -62,7 +76,11 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 	private transient static ClusterBuilder builder = new ClusterBuilder() {
 		@Override
 		protected Cluster buildCluster(Cluster.Builder builder) {
-			return builder.addContactPoint("127.0.0.1").build();
+			return builder
+				.addContactPoint("127.0.0.1")
+				.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+				.withoutJMXReporting()
+				.withoutMetrics().build();
 		}
 	};
 	private static Cluster cluster;
@@ -102,7 +120,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 	public static void startCassandra() throws IOException {
 		//generate temporary files
 		tmpDir = CommonTestUtils.createTempDirectory();
-		ClassLoader classLoader = CassandraIdempotentExactlyOnceSink.class.getClassLoader();
+		ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
 		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
 		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
 		tmp.createNewFile();
@@ -133,13 +151,24 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 		} catch (InterruptedException e) { //give cassandra a few seconds to start up
 		}
 
-		cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
+		cluster = builder.getCluster();
 		session = cluster.connect();
 
 		session.execute(CREATE_KEYSPACE_QUERY);
 		session.execute(CREATE_TABLE_QUERY);
 	}
 
+	@Before
+	public void checkIfIgnore() {
+		String runtime = System.getProperty("java.runtime.name");
+				String version = System.getProperty("java.runtime.version");
+		LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
+		// The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
+		// Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
+		// OpenJDK is "OpenJDK Runtime Environment"
+		Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
+	}
+
 	@After
 	public void deleteSchema() throws Exception {
 		session.executeAsync(CLEAR_TABLE_QUERY);
@@ -158,23 +187,12 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 
 	//=====Exactly-Once=================================================================================================
 	@Override
-	protected CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> createSink() {
-		try {
-			return new CassandraIdempotentExactlyOnceSink<>(
-				INSERT_DATA_QUERY,
-				TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
-				new ClusterBuilder() {
-					@Override
-					protected Cluster buildCluster(Cluster.Builder builder) {
-						return builder.addContactPoint("127.0.0.1").build();
-					}
-				},
-				"testJob",
-				new CassandraCommitter(builder));
-		} catch (Exception e) {
-			Assert.fail("Failure while initializing sink: " + e.getMessage());
-			return null;
-		}
+	protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
+		return new CassandraTupleWriteAheadSink<>(
+			INSERT_DATA_QUERY,
+			TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
+			builder,
+			new CassandraCommitter(builder));
 	}
 
 	@Override
@@ -191,7 +209,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 	protected void verifyResultsIdealCircumstances(
 		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
 		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
-		CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
 		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
@@ -209,7 +227,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 	protected void verifyResultsDataPersistenceUponMissedNotify(
 		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
 		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
-		CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
 		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
@@ -227,7 +245,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 	protected void verifyResultsDataDiscardingUponRestore(
 		OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
 		OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
-		CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
 		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
@@ -244,6 +262,60 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
 	}
 
+	@Test
+	public void testCassandraCommitter() throws Exception {
+		CassandraCommitter cc1 = new CassandraCommitter(builder);
+		cc1.setJobId("job");
+		cc1.setOperatorId("operator");
+		cc1.setOperatorSubtaskId(0);
+
+		CassandraCommitter cc2 = new CassandraCommitter(builder);
+		cc2.setJobId("job");
+		cc2.setOperatorId("operator");
+		cc2.setOperatorSubtaskId(1);
+
+		CassandraCommitter cc3 = new CassandraCommitter(builder);
+		cc3.setJobId("job");
+		cc3.setOperatorId("operator1");
+		cc3.setOperatorSubtaskId(0);
+
+		cc1.createResource();
+
+		cc1.open();
+		cc2.open();
+		cc3.open();
+
+		Assert.assertFalse(cc1.isCheckpointCommitted(1));
+		Assert.assertFalse(cc2.isCheckpointCommitted(1));
+		Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+		cc1.commitCheckpoint(1);
+		Assert.assertTrue(cc1.isCheckpointCommitted(1));
+		//verify that other sub-tasks aren't affected
+		Assert.assertFalse(cc2.isCheckpointCommitted(1));
+		//verify that other tasks aren't affected
+		Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+		Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+		cc1.close();
+		cc2.close();
+		cc3.close();
+
+		cc1 = new CassandraCommitter(builder);
+		cc1.setJobId("job");
+		cc1.setOperatorId("operator");
+		cc1.setOperatorSubtaskId(0);
+
+		cc1.open();
+
+		//verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
+		Assert.assertTrue(cc1.isCheckpointCommitted(1));
+		Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+		cc1.close();
+	}
+
 	//=====At-Least-Once================================================================================================
 	@Test
 	public void testCassandraTupleAtLeastOnceSink() throws Exception {
@@ -251,7 +323,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 		env.setParallelism(1);
 
 		DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
-		source.addSink(new CassandraTupleAtLeastOnceSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+		source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
 
 		env.execute();
 
@@ -287,7 +359,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
 				}
 			});
 
-		source.addSink(new CassandraPojoAtLeastOnceSink<>(Pojo.class, builder));
+		source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
 
 		env.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
deleted file mode 100644
index 5a0b299..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.UUID;
-
-public class CassandraIdempotentExactlyOnceSinkExample {
-	public static void main(String[] args) throws Exception {
-
-		class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
-			private int counter = 0;
-			private boolean stop = false;
-
-			@Override
-			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-				while (!stop) {
-					Thread.sleep(50);
-					ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1));
-					counter++;
-					if (counter == 100) {
-						stop = true;
-					}
-				}
-			}
-
-			@Override
-			public void cancel() {
-				stop = true;
-			}
-
-			@Override
-			public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-				return counter;
-			}
-
-			@Override
-			public void restoreState(Integer state) throws Exception {
-				this.counter = state;
-			}
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		env.enableCheckpointing(1000);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
-		env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
-
-		CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
-			.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-			.setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
-			.setClusterBuilder(new ClusterBuilder() {
-				@Override
-				public Cluster buildCluster(Cluster.Builder builder) {
-					return builder.addContactPoint("127.0.0.1").build();
-				}
-			})
-			.build();
-
-		sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
deleted file mode 100644
index 8c116c7..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-public class CassandraPojoAtLeastOnceSinkExample {
-	private static final ArrayList<Message> messages = new ArrayList<>(20);
-
-	static {
-		for (long i = 0; i < 20; i++) {
-			messages.add(new Message("cassandra-" + i));
-		}
-	}
-
-	/*
-	 *	create table: "CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY);"
-	 */
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Message> source = env.fromCollection(messages);
-
-		CassandraSink.addSink(source)
-			.setClusterBuilder(new ClusterBuilder() {
-				@Override
-				protected Cluster buildCluster(Builder builder) {
-					return builder.addContactPoint("127.0.0.1").build();
-				}
-			})
-			.build();
-
-		env.execute("Cassandra Sink example");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
new file mode 100644
index 0000000..e1bcea9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API.
+ * 
+ * Pojo's have to be annotated with datastax annotations to work with this sink.
+ *
+ * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
+ */
+public class CassandraPojoSinkExample {
+	private static final ArrayList<Message> messages = new ArrayList<>(20);
+
+	static {
+		for (long i = 0; i < 20; i++) {
+			messages.add(new Message("cassandra-" + i));
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Message> source = env.fromCollection(messages);
+
+		CassandraSink.addSink(source)
+			.setClusterBuilder(new ClusterBuilder() {
+				@Override
+				protected Cluster buildCluster(Builder builder) {
+					return builder.addContactPoint("127.0.0.1").build();
+				}
+			})
+			.build();
+
+		env.execute("Cassandra Sink example");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
deleted file mode 100644
index 25eb5c2..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-public class CassandraTupleAtLeastOnceSinkExample {
-	private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)";
-	private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
-
-	static {
-		for (int i = 0; i < 20; i++) {
-			collection.add(new Tuple2<>("cassandra-" + i, i));
-		}
-	}
-
-	/*
-	 * table script: "CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)"
-	 */
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
-
-		CassandraSink.addSink(source)
-			.setQuery(INSERT)
-			.setClusterBuilder(new ClusterBuilder() {
-				@Override
-				protected Cluster buildCluster(Builder builder) {
-					return builder.addContactPoint("127.0.0.1").build();
-				}
-			})
-			.build();
-
-		env.execute("WriteTupleIntoCassandra");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
new file mode 100644
index 0000000..c6345df
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API.
+ *
+ * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)
+ */
+public class CassandraTupleSinkExample {
+	private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)";
+	private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
+
+	static {
+		for (int i = 0; i < 20; i++) {
+			collection.add(new Tuple2<>("cassandra-" + i, i));
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
+
+		CassandraSink.addSink(source)
+			.setQuery(INSERT)
+			.setClusterBuilder(new ClusterBuilder() {
+				@Override
+				protected Cluster buildCluster(Builder builder) {
+					return builder.addContactPoint("127.0.0.1").build();
+				}
+			})
+			.build();
+
+		env.execute("WriteTupleIntoCassandra");
+	}
+}