You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/16 14:58:33 UTC
[4/4] flink git commit: [FLINK-3332] Add Exactly-Once Cassandra
connector
[FLINK-3332] Add Exactly-Once Cassandra connector
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a28a2d09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a28a2d09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a28a2d09
Branch: refs/heads/master
Commit: a28a2d09626b02b18e846556945c70791e5c2502
Parents: 57ef6c3
Author: zentol <ch...@apache.org>
Authored: Tue Mar 8 17:00:24 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 16 16:58:04 2016 +0200
----------------------------------------------------------------------
docs/apis/streaming/connectors/cassandra.md | 103 +++++++
docs/apis/streaming/connectors/index.md | 3 +-
docs/apis/streaming/fault_tolerance.md | 5 +
.../api/writer/ResultPartitionWriter.java | 2 +-
.../flink-connector-cassandra/pom.xml | 163 +++++++++++
.../cassandra/CassandraCommitter.java | 126 +++++++++
.../CassandraIdempotentExactlyOnceSink.java | 116 ++++++++
.../connectors/cassandra/CassandraSink.java | 282 +++++++++++++++++++
.../connectors/cassandra/ClusterBuilder.java | 31 ++
.../cassandra/CassandraConnectorTest.java | 219 ++++++++++++++
...ssandraIdempotentExactlyOnceSinkExample.java | 88 ++++++
.../src/test/resources/cassandra.yaml | 43 +++
.../src/test/resources/log4j-test.properties | 29 ++
flink-streaming-connectors/pom.xml | 1 +
.../runtime/operators/CheckpointCommitter.java | 114 ++++++++
.../operators/GenericAtLeastOnceSink.java | 197 +++++++++++++
.../operators/AtLeastOnceSinkTestBase.java | 218 ++++++++++++++
.../operators/GenericAtLeastOnceSinkTest.java | 147 ++++++++++
18 files changed, 1885 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/docs/apis/streaming/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/cassandra.md b/docs/apis/streaming/connectors/cassandra.md
new file mode 100644
index 0000000..520354f
--- /dev/null
+++ b/docs/apis/streaming/connectors/cassandra.md
@@ -0,0 +1,103 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides sinks that writes data into a [Cassandra](https://cassandra.apache.org/) database.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-cassandra{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Installing Apache Cassandra
+Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
+
+#### Cassandra Sink
+
+Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
+This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
+
+The following configuration methods can be used:
+
+1. setQuery(String query)
+2. setConsistencyLevel(ConsistencyLevel level)
+3. setClusterBuilder(ClusterBuilder builder)
+4. setCheckpointCommitter(CheckpointCommitter committer)
+5. build()
+
+setQuery() sets the query that is executed for every value the sink receives.
+setConsistencyLevel() sets the desired consistency level (AT_LEAST_ONCE / EXACTLY_ONCE).
+setClusterBuilder() sets the cluster builder that is used to configure the connection to cassandra.
+setCheckpointCommitter() is an optional method for EXACTLY_ONCE processing.
+
+A checkpoint committer stores additional information about completed checkpoints
+in some resource. You can use a `CassandraCommitter` to store these in a separate
+table in cassandra. Note that this table will NOT be cleaned up by Flink.
+
+build() finalizes the configuration and returns the CassandraSink.
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+CassandraSink.addSink(input)
+ .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+ .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
+ .setCheckpointCommitter(new CassandraCommitter())
+ .setClusterBuilder(new ClusterBuilder() {
+ @Override
+ public Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ })
+ .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+CassandraSink.addSink(input)
+ .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+ .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
+ .setCheckpointCommitter(new CassandraCommitter())
+ .setClusterBuilder(new ClusterBuilder() {
+ @Override
+ public Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ })
+ .build();
+{% endhighlight %}
+</div>
+</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/docs/apis/streaming/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md
index 85a07a1..87182b7 100644
--- a/docs/apis/streaming/connectors/index.md
+++ b/docs/apis/streaming/connectors/index.md
@@ -4,7 +4,7 @@ title: "Streaming Connectors"
# Sub-level navigation
sub-nav-group: streaming
sub-nav-id: connectors
-sub-nav-pos: 8
+sub-nav-pos: 6
sub-nav-title: Connectors
---
<!--
@@ -38,6 +38,7 @@ Currently these systems are supported:
* [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source)
* [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
* [Apache NiFi](https://nifi.apache.org) (sink/source)
+ * [Apache Cassandra](https://cassandra.apache.org/) (sink)
To run an application using one of these connectors, additional third party
components are usually required to be installed and launched, e.g. the servers
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
index 7f861fc..43e65a9 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -177,6 +177,11 @@ state updates) of Flink coupled with bundled sinks:
<td></td>
</tr>
<tr>
+ <td>Cassandra sink</td>
+ <td>exactly-once</td>
+ <td>only for idempotent updates</td>
+ </tr>
+ <tr>
<td>File sinks</td>
<td>at least once</td>
<td></td>
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 79c21c6..cfab34d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
* The {@link ResultPartitionWriter} is the runtime API for producing results. It
* supports two kinds of data to be sent: buffers and events.
*/
-public final class ResultPartitionWriter implements EventListener<TaskEvent> {
+public class ResultPartitionWriter implements EventListener<TaskEvent> {
private final ResultPartition partition;
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
new file mode 100644
index 0000000..5242c13
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-cassandra_2.10</artifactId>
+ <name>flink-connector-cassandra</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <cassandra.version>2.2.5</cassandra.version>
+ <driver.version>3.0.0</driver.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <reuseForks>true</reuseForks>
+ <forkCount>1</forkCount>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.1</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>com.datastax.cassandra:cassandra-driver-core</include>
+ <include>com.datastax.cassandra:cassandra-driver-mapping</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+ <excludes>
+ <exclude>com.google.protobuf.**</exclude>
+ <exclude>com.google.inject.**</exclude>
+ </excludes>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${driver.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-mapping</artifactId>
+ <version>${driver.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>16.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ <version>${cassandra.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
new file mode 100644
index 0000000..c5d47e7
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
+ * database.
+ * <p/>
+ * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+ private ClusterBuilder builder;
+ private transient Cluster cluster;
+ private transient Session session;
+
+ private static final String KEYSPACE = "flink_auxiliary";
+ private String TABLE = "checkpoints_";
+
+ private transient PreparedStatement deleteStatement;
+ private transient PreparedStatement updateStatement;
+ private transient PreparedStatement selectStatement;
+
+ public CassandraCommitter(ClusterBuilder builder) {
+ this.builder = builder;
+ ClosureCleaner.clean(builder, true);
+ }
+
+ /**
+ * Internally used to set the job ID after instantiation.
+ *
+ * @param id
+ * @throws Exception
+ */
+ public void setJobId(String id) throws Exception {
+ super.setJobId(id);
+ TABLE += id;
+ }
+
+ /**
+ * Generates the necessary tables to store information.
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public void createResource() throws Exception {
+ cluster = builder.getCluster();
+ session = cluster.connect();
+
+ session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
+ session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
+
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ if (builder == null) {
+ throw new RuntimeException("No ClusterBuilder was set.");
+ }
+ cluster = builder.getCluster();
+ session = cluster.connect();
+
+ deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+ updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+ selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+
+ session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", KEYSPACE, TABLE, operatorId, subtaskId));
+ }
+
+ @Override
+ public void close() throws Exception {
+ session.executeAsync(deleteStatement.bind());
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ public void commitCheckpoint(long checkpointID) {
+ session.execute(updateStatement.bind(checkpointID));
+ }
+
+ @Override
+ public boolean isCheckpointCommitted(long checkpointID) {
+ long lastId = session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
+ return checkpointID <= lastId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
new file mode 100644
index 0000000..b9de9c4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing
+ * mechanism and provides exactly-once guarantees for idempotent updates.
+ * <p/>
+ * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraIdempotentExactlyOnceSink<IN extends Tuple> extends GenericAtLeastOnceSink<IN> {
+ protected transient Cluster cluster;
+ protected transient Session session;
+
+ private final String insertQuery;
+ private transient PreparedStatement preparedStatement;
+
+ private transient Throwable exception = null;
+ private transient FutureCallback<ResultSet> callback;
+
+ private ClusterBuilder builder;
+
+ protected CassandraIdempotentExactlyOnceSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception {
+ super(committer, serializer, jobID);
+ this.insertQuery = insertQuery;
+ this.builder = builder;
+ ClosureCleaner.clean(builder, true);
+ }
+
+ public void open() throws Exception {
+ super.open();
+ this.callback = new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet resultSet) {
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ exception = throwable;
+ }
+ };
+ cluster = builder.getCluster();
+ session = cluster.connect();
+ preparedStatement = session.prepare(insertQuery);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ protected void sendValue(Iterable<IN> values, long timestamp) throws Exception {
+ //verify that no query failed until now
+ if (exception != null) {
+ throw new Exception(exception);
+ }
+ //set values for prepared statement
+ for (IN value : values) {
+ Object[] fields = new Object[value.getArity()];
+ for (int x = 0; x < value.getArity(); x++) {
+ fields[x] = value.getField(x);
+ }
+ //insert values and send to cassandra
+ BoundStatement s = preparedStatement.bind(fields);
+ s.setDefaultTimestamp(timestamp);
+ ResultSetFuture result = session.executeAsync(s);
+ if (result != null) {
+ //add callback to detect errors
+ Futures.addCallback(result, callback);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
new file mode 100644
index 0000000..012ece5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
+ *
+ * @param <IN> input type
+ */
+public class CassandraSink<IN> {
+ private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
+ private final boolean useDataStreamSink;
+ private DataStreamSink<IN> sink1;
+ private SingleOutputStreamOperator<IN> sink2;
+
+ private CassandraSink(DataStreamSink<IN> sink) {
+ sink1 = sink;
+ useDataStreamSink = true;
+ }
+
+ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
+ sink2 = sink;
+ useDataStreamSink = false;
+ }
+
+ private SinkTransformation<IN> getSinkTransformation() {
+ return sink1.getTransformation();
+ }
+
+ private StreamTransformation<IN> getStreamTransformation() {
+ return sink2.getTransformation();
+ }
+
+ /**
+ * Sets the name of this sink. This name is
+ * used by the visualization and logging during runtime.
+ *
+ * @return The named sink.
+ */
+ public CassandraSink<IN> name(String name) {
+ if (useDataStreamSink) {
+ getSinkTransformation().setName(name);
+ } else {
+ getStreamTransformation().setName(name);
+ }
+ return this;
+ }
+
+ /**
+ * Sets an ID for this operator.
+ * <p/>
+ * <p>The specified ID is used to assign the same operator ID across job
+ * submissions (for example when starting a job from a savepoint).
+ * <p/>
+ * <p><strong>Important</strong>: this ID needs to be unique per
+ * transformation and job. Otherwise, job submission will fail.
+ *
+ * @param uid The unique user-specified ID of this transformation.
+ * @return The operator with the specified ID.
+ */
+ public CassandraSink<IN> uid(String uid) {
+ if (useDataStreamSink) {
+ getSinkTransformation().setUid(uid);
+ } else {
+ getStreamTransformation().setUid(uid);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the parallelism for this sink. The degree must be higher than zero.
+ *
+ * @param parallelism The parallelism for this sink.
+ * @return The sink with set parallelism.
+ */
+ public CassandraSink<IN> setParallelism(int parallelism) {
+ if (useDataStreamSink) {
+ getSinkTransformation().setParallelism(parallelism);
+ } else {
+ getStreamTransformation().setParallelism(parallelism);
+ }
+ return this;
+ }
+
+ /**
+ * Turns off chaining for this operator so thread co-location will not be
+ * used as an optimization.
+ * <p/>
+ * <p/>
+ * Chaining can be turned off for the whole
+ * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
+ * however it is not advised for performance considerations.
+ *
+ * @return The sink with chaining disabled
+ */
+ public CassandraSink<IN> disableChaining() {
+ if (useDataStreamSink) {
+ getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+ } else {
+ getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the slot sharing group of this operation. Parallel instances of
+ * operations that are in the same slot sharing group will be co-located in the same
+ * TaskManager slot, if possible.
+ * <p/>
+ * <p>Operations inherit the slot sharing group of input operations if all input operations
+ * are in the same slot sharing group and no slot sharing group was explicitly specified.
+ * <p/>
+ * <p>Initially an operation is in the default slot sharing group. An operation can be put into
+ * the default group explicitly by setting the slot sharing group to {@code "default"}.
+ *
+ * @param slotSharingGroup The slot sharing group name.
+ */
+ public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
+ if (useDataStreamSink) {
+ getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
+ } else {
+ getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
+ }
+ return this;
+ }
+
+ /**
+ * Writes a DataStream into a Cassandra database.
+ *
+ * @param input input DataStream
+ * @param <IN> input type
+ * @return CassandraSinkBuilder, to further configure the sink
+ */
+ public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
+ if (input.getType() instanceof TupleTypeInfo) {
+ DataStream<T> tupleInput = (DataStream<T>) input;
+ return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
+ } else {
+ throw new IllegalArgumentException("POJOs are currently not supported.");
+ }
+ }
+
+ public enum ConsistencyLevel {
+ At_LEAST_ONCE,
+ EXACTLY_ONCE
+ }
+
+ public abstract static class CassandraSinkBuilder<IN> {
+ protected final DataStream<IN> input;
+ protected final TypeSerializer<IN> serializer;
+ protected final TypeInformation<IN> typeInfo;
+ protected ConsistencyLevel consistency = ConsistencyLevel.At_LEAST_ONCE;
+ protected ClusterBuilder builder;
+ protected String query;
+ protected CheckpointCommitter committer;
+
+ public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+ this.input = input;
+ this.typeInfo = typeInfo;
+ this.serializer = serializer;
+ }
+
+ /**
+ * Sets the query that is to be executed for every record.
+ * This parameter is mandatory.
+ *
+ * @param query query to use
+ * @return this builder
+ */
+ public CassandraSinkBuilder<IN> setQuery(String query) {
+ this.query = query;
+ return this;
+ }
+
+ public CassandraSinkBuilder<IN> setHost(String host) {
+ return setHost(host, 9042);
+ }
+
+ public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
+ builder = new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPoint(host).withPort(port).build();
+ }
+ };
+ return this;
+ }
+
+ /**
+ * Specifies the desired consistency level for this sink. Different sink implementations may be used depending
+ * on this parameter.
+ * This parameter is mandatory.
+ *
+ * @param consistency desired consistency level
+ * @return this builder
+ */
+ public CassandraSinkBuilder<IN> setConsistencyLevel(ConsistencyLevel consistency) {
+ this.consistency = consistency;
+ return this;
+ }
+
+ /**
+ * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
+ * This field is mandatory.
+ *
+ * @param builder ClusterBuilder to configure the connection to cassandra
+ * @return this builder
+ */
+ public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
+ this.builder = builder;
+ return this;
+ }
+
+ /**
+ * Sets the CheckpointCommitter for this sink. A CheckpointCommitter stores information about completed checkpoints
+ * in a resource outside of Flink.
+ * If the desired consistency level is EXACTLY_ONCE and this field is not set, a default committer will be used.
+ *
+ * @param committer
+ * @return this builder
+ */
+ public CassandraSinkBuilder<IN> setCheckpointCommitter(CheckpointCommitter committer) {
+ this.committer = committer;
+ return this;
+ }
+
+ /**
+ * Finalizes the configuration of this sink.
+ *
+ * @return finalized sink
+ * @throws Exception
+ */
+ public abstract CassandraSink<IN> build() throws Exception;
+ }
+
+ public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
+ public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+ super(input, typeInfo, serializer);
+ }
+
+ @Override
+ public CassandraSink<IN> build() throws Exception {
+ if (consistency == ConsistencyLevel.EXACTLY_ONCE) {
+ return committer == null
+ ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, new CassandraCommitter(builder))))
+ : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, committer)));
+ } else {
+ throw new IllegalArgumentException("There is currently no dedicated support for at-least-once guarantees.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
new file mode 100644
index 0000000..d3db435
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+
+import java.io.Serializable;
+
+public abstract class ClusterBuilder implements Serializable {
+
+ public Cluster getCluster() {
+ return buildCluster(Cluster.builder());
+ }
+
+ protected abstract Cluster buildCluster(Cluster.Builder builder);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
new file mode 100644
index 0000000..bb7c0fb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.runtime.operators.AtLeastOnceSinkTestBase;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Scanner;
+import java.util.UUID;
+
+public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<String, Integer, Integer>, CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>>> {
+ private static File tmpDir;
+
+ private static final boolean EMBEDDED = true;
+ private static EmbeddedCassandraService cassandra;
+ private static Cluster cluster;
+ private static Session session;
+
+ private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+ private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
+ private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+ private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+ private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
+ private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
+
+ private static class EmbeddedCassandraService {
+ CassandraDaemon cassandraDaemon;
+
+ public void start() throws IOException {
+ this.cassandraDaemon = new CassandraDaemon();
+ this.cassandraDaemon.init(null);
+ this.cassandraDaemon.start();
+ }
+
+ public void stop() {
+ this.cassandraDaemon.stop();
+ }
+ }
+
+ @BeforeClass
+ public static void startCassandra() throws IOException {
+ //generate temporary files
+ tmpDir = CommonTestUtils.createTempDirectory();
+ ClassLoader classLoader = CassandraIdempotentExactlyOnceSink.class.getClassLoader();
+ File file = new File(classLoader.getResource("cassandra.yaml").getFile());
+ File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
+ tmp.createNewFile();
+ BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+ //copy cassandra.yaml; inject absolute paths into cassandra.yaml
+ Scanner scanner = new Scanner(file);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ line = line.replace("$PATH", "'" + tmp.getParentFile());
+ b.write(line + "\n");
+ b.flush();
+ }
+ scanner.close();
+
+
+ // Tell cassandra where the configuration files are.
+ // Use the test configuration file.
+ System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
+
+ if (EMBEDDED) {
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+ }
+
+ cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
+ session = cluster.connect();
+
+ session.execute(CREATE_KEYSPACE_QUERY);
+ session.execute(CREATE_TABLE_QUERY);
+ }
+
+ @After
+ public void deleteSchema() throws Exception {
+ session.executeAsync(CLEAR_TABLE_QUERY);
+ }
+
+ @AfterClass
+ public static void closeCassandra() {
+ session.executeAsync(DROP_KEYSPACE_QUERY);
+ session.close();
+ cluster.close();
+ if (EMBEDDED) {
+ cassandra.stop();
+ }
+ tmpDir.delete();
+ }
+
+ @Override
+ protected CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> createSink() {
+ try {
+ return new CassandraIdempotentExactlyOnceSink<>(
+ INSERT_DATA_QUERY,
+ TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
+ new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ },
+ "testJob",
+ new CassandraCommitter(new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ }));
+ } catch (Exception e) {
+ Assert.fail("Failure while initializing sink: " + e.getMessage());
+ return null;
+ }
+ }
+
+ @Override
+ protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
+ return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
+ }
+
+ @Override
+ protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
+ return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
+ }
+
+ @Override
+ protected void verifyResultsIdealCircumstances(
+ OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+ OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+ CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+
+ ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Row s : result) {
+ list.remove(new Integer(s.getInt("counter")));
+ }
+ Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+ }
+
+ @Override
+ protected void verifyResultsDataPersistenceUponMissedNotify(
+ OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+ OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+ CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+
+ ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Row s : result) {
+ list.remove(new Integer(s.getInt("counter")));
+ }
+ Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+ }
+
+ @Override
+ protected void verifyResultsDataDiscardingUponRestore(
+ OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+ OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+ CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+
+ ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 20; x++) {
+ list.add(x);
+ }
+ for (int x = 41; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Row s : result) {
+ list.remove(new Integer(s.getInt("counter")));
+ }
+ Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
new file mode 100644
index 0000000..5a0b299
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.UUID;
+
+public class CassandraIdempotentExactlyOnceSinkExample {
+ public static void main(String[] args) throws Exception {
+
+ class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
+ private int counter = 0;
+ private boolean stop = false;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ while (!stop) {
+ Thread.sleep(50);
+ ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1));
+ counter++;
+ if (counter == 100) {
+ stop = true;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return counter;
+ }
+
+ @Override
+ public void restoreState(Integer state) throws Exception {
+ this.counter = state;
+ }
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
+ env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
+
+ CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
+ .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
+ .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
+ .setClusterBuilder(new ClusterBuilder() {
+ @Override
+ public Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ })
+ .build();
+
+ sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
+
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
new file mode 100644
index 0000000..32c18ea
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
@@ -0,0 +1,43 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 10000
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
+commitlog_directory: $PATH\commit'
+data_file_directories:
+ - $PATH\data'
+saved_caches_directory: $PATH\cache'
+listen_address: '127.0.0.1'
+seed_provider:
+ - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider'
+ parameters:
+ - seeds: '127.0.0.1'
+native_transport_port: 9042
+
+concurrent_reads: 8
+concurrent_writes: 8
+
+auto_bootstrap: false
+auto_snapshot: false
+
+start_rpc: false
+start_native_transport: true
+native_transport_max_threads: 8
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6d70ebd
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml
index 2bfd9b2..91691cf 100644
--- a/flink-streaming-connectors/pom.xml
+++ b/flink-streaming-connectors/pom.xml
@@ -45,6 +45,7 @@ under the License.
<module>flink-connector-rabbitmq</module>
<module>flink-connector-twitter</module>
<module>flink-connector-nifi</module>
+ <module>flink-connector-cassandra</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
new file mode 100644
index 0000000..22e629e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * This class is used to save information about which sink operator instance has committed checkpoints to a backend.
+ * <p/>
+ * The current checkpointing mechanism is ill-suited for sinks relying on backends that do not support roll-backs.
+ * When dealing with such a system, while trying to get exactly-once semantics, one may neither commit data while
+ * creating the snapshot (since another sink instance may fail, leading to a replay on the same data) nor when receiving
+ * a checkpoint-complete notification (since a subsequent failure would leave us with no knowledge as to whether data
+ * was committed or not).
+ * <p/>
+ * A CheckpointCommitter can be used to solve the second problem by saving whether an instance committed all data
+ * belonging to a checkpoint. This data must be stored in a backend that is persistent across retries (which rules
+ * out Flink's state mechanism) and accessible from all machines, like a database or distributed file.
+ * <p/>
+ * There is no mandate as to how the resource is shared; there may be one resource for all Flink jobs, or one for
+ * each job/operator/-instance separately. This implies that the resource must not be cleaned up by the system itself,
+ * and as such should kept as small as possible.
+ */
+public abstract class CheckpointCommitter implements Serializable {
+ protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCommitter.class);
+ protected String jobId;
+ protected String operatorId;
+ protected int subtaskId;
+
+ /**
+ * Internally used to set the job ID after instantiation.
+ *
+ * @param id
+ * @throws Exception
+ */
+ public void setJobId(String id) throws Exception {
+ this.jobId = id;
+ }
+
+ /**
+ * Internally used to set the operator ID after instantiation.
+ *
+ * @param id
+ * @throws Exception
+ */
+ public void setOperatorId(String id) throws Exception {
+ this.operatorId = id;
+ }
+
+ /**
+ * Internally used to set the operator subtask ID after instantiation.
+ *
+ * @param id
+ * @throws Exception
+ */
+ public void setOperatorSubtaskId(int id) throws Exception {
+ this.subtaskId = id;
+ }
+
+ /**
+ * Opens/connects to the resource, and possibly creates it beforehand.
+ *
+ * @throws Exception
+ */
+ public abstract void open() throws Exception;
+
+ /**
+ * Closes the resource/connection to it. The resource should generally still exist after this call.
+ *
+ * @throws Exception
+ */
+ public abstract void close() throws Exception;
+
+ /**
+ * Creates/opens/connects to the resource that is used to store information. Called once directly after instantiation.
+ * @throws Exception
+ */
+ public abstract void createResource() throws Exception;
+
+ /**
+ * Mark the given checkpoint as completed in the resource.
+ *
+ * @param checkpointID
+ * @throws Exception
+ */
+ public abstract void commitCheckpoint(long checkpointID) throws Exception;
+
+ /**
+ * Checked the resource whether the given checkpoint was committed completely.
+ *
+ * @param checkpointID
+ * @return true if the checkpoint was committed completely, false otherwise
+ * @throws Exception
+ */
+ public abstract boolean isCheckpointCommitted(long checkpointID) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java
new file mode 100644
index 0000000..eb4aec3
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSink.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing
+ * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation.
+ * <p/>
+ * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+ protected static final Logger LOG = LoggerFactory.getLogger(GenericAtLeastOnceSink.class);
+ private final CheckpointCommitter committer;
+ private transient AbstractStateBackend.CheckpointStateOutputView out;
+ protected final TypeSerializer<IN> serializer;
+ private final String id;
+
+ private ExactlyOnceState state = new ExactlyOnceState();
+
+ public GenericAtLeastOnceSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
+ this.committer = committer;
+ this.serializer = serializer;
+ this.id = UUID.randomUUID().toString();
+ this.committer.setJobId(jobID);
+ this.committer.createResource();
+ }
+
+ @Override
+ public void open() throws Exception {
+ committer.setOperatorId(id);
+ committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+ committer.open();
+ }
+
+ public void close() throws Exception {
+ committer.close();
+ }
+
+ /**
+ * Saves a handle in the state.
+ *
+ * @param checkpointId
+ * @throws IOException
+ */
+ private void saveHandleInState(final long checkpointId, final long timestamp) throws IOException {
+ synchronized (state) {
+ //only add handle if a new OperatorState was created since the last snapshot
+ if (out != null) {
+ StateHandle<DataInputView> handle = out.closeAndGetHandle();
+ state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
+ out = null;
+ }
+ }
+ }
+
+ @Override
+ public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
+ synchronized (state) {
+ StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+ saveHandleInState(checkpointId, timestamp);
+
+ taskState.setFunctionState(state);
+ return taskState;
+ }
+ }
+
+ @Override
+ public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+ super.restoreState(state, recoveryTimestamp);
+ this.state = (ExactlyOnceState) state.getFunctionState();
+ out = null;
+ }
+
+ @Override
+ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+ super.notifyOfCompletedCheckpoint(checkpointId);
+
+ synchronized (state.pendingHandles) {
+ Set<Long> pastCheckpointIds = state.pendingHandles.keySet();
+ Set<Long> checkpointsToRemove = new HashSet<>();
+ for (Long pastCheckpointId : pastCheckpointIds) {
+ if (pastCheckpointId <= checkpointId) {
+ if (!committer.isCheckpointCommitted(pastCheckpointId)) {
+ Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
+ DataInputView in = handle.f1.getState(getUserCodeClassloader());
+ sendValue(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
+ committer.commitCheckpoint(pastCheckpointId);
+ }
+ checkpointsToRemove.add(pastCheckpointId);
+ }
+ }
+ for (Long toRemove : checkpointsToRemove) {
+ Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(toRemove);
+ state.pendingHandles.remove(toRemove);
+ handle.f1.discardState();
+ }
+ }
+ }
+
+
+ /**
+ * Write the given element into the backend.
+ *
+ * @param value value to be written
+ * @throws Exception
+ */
+ protected abstract void sendValue(Iterable<IN> value, long timestamp) throws Exception;
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ IN value = element.getValue();
+ //generate initial operator state
+ if (out == null) {
+ out = getStateBackend().createCheckpointStateOutputView(0, 0);
+ }
+ serializer.serialize(value, out);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ //don't do anything, since we are a sink
+ }
+
+ /**
+ * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were
+ * used since the last completed checkpoint.
+ **/
+ public class ExactlyOnceState implements StateHandle<Serializable> {
+ protected TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> pendingHandles;
+
+ public ExactlyOnceState() {
+ pendingHandles = new TreeMap<>();
+ }
+
+ @Override
+ public TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> getState(ClassLoader userCodeClassLoader) throws Exception {
+ return pendingHandles;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ for (Tuple2<Long, StateHandle<DataInputView>> pair : pendingHandles.values()) {
+ pair.f1.discardState();
+ }
+ pendingHandles = new TreeMap<>();
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ int stateSize = 0;
+ for (Tuple2<Long, StateHandle<DataInputView>> pair : pendingHandles.values()) {
+ stateSize += pair.f1.getStateSize();
+ }
+ return stateSize;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java
new file mode 100644
index 0000000..1f2fa54
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/AtLeastOnceSinkTestBase.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+public abstract class AtLeastOnceSinkTestBase<IN, S extends GenericAtLeastOnceSink<IN>> {
+
+ protected class OperatorExposingTask<IN> extends OneInputStreamTask<IN, IN> {
+ public OneInputStreamOperator<IN, IN> getOperator() {
+ return this.headOperator;
+ }
+ }
+
+ protected OperatorExposingTask<IN> createTask() {
+ return new OperatorExposingTask<>();
+ }
+
+ protected abstract S createSink() throws Exception;
+
+ protected abstract TypeInformation<IN> createTypeInfo();
+
+ protected abstract IN generateValue(int counter, int checkpointID);
+
+ protected abstract void verifyResultsIdealCircumstances(
+ OneInputStreamTaskTestHarness<IN, IN> harness, OneInputStreamTask<IN, IN> task, S sink) throws Exception;
+
+ protected abstract void verifyResultsDataPersistenceUponMissedNotify(
+ OneInputStreamTaskTestHarness<IN, IN> harness, OneInputStreamTask<IN, IN> task, S sink) throws Exception;
+
+ protected abstract void verifyResultsDataDiscardingUponRestore(
+ OneInputStreamTaskTestHarness<IN, IN> harness, OneInputStreamTask<IN, IN> task, S sink) throws Exception;
+
+ @Test
+ public void testIdealCircumstances() throws Exception {
+ OperatorExposingTask<IN> task = createTask();
+ TypeInformation<IN> info = createTypeInfo();
+ OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, info, info);
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(createSink());
+
+ int elementCounter = 1;
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ ArrayList<StreamTaskState> states = new ArrayList<>();
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ testHarness.endInput();
+
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ testHarness.waitForTaskCompletion();
+
+ verifyResultsIdealCircumstances(testHarness, task, (S) task.getOperator());
+ }
+
+ @Test
+ public void testDataPersistenceUponMissedNotify() throws Exception {
+ S sink = createSink();
+ OperatorExposingTask<IN> task = createTask();
+ TypeInformation<IN> info = createTypeInfo();
+ OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, info, info);
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(sink);
+
+ int elementCounter = 1;
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ ArrayList<StreamTaskState> states = new ArrayList<>();
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ testHarness.endInput();
+
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ testHarness.waitForTaskCompletion();
+
+ verifyResultsDataPersistenceUponMissedNotify(testHarness, task, (S) task.getOperator());
+ }
+
+ @Test
+ public void testDataDiscardingUponRestore() throws Exception {
+ S sink = createSink();
+ OperatorExposingTask<IN> task = createTask();
+ TypeInformation<IN> info = createTypeInfo();
+ OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, info, info);
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(sink);
+
+ int elementCounter = 1;
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ ArrayList<StreamTaskState> states = new ArrayList<>();
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+ task.getOperator().restoreState(states.get(states.size() - 1), 0);
+
+ for (int x = 0; x < 20; x++) {
+ testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
+ elementCounter++;
+ }
+ testHarness.waitForInputProcessing();
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ task.notifyCheckpointComplete(states.size() - 1);
+
+ testHarness.endInput();
+
+ states.add(copyTaskState(task.getOperator().snapshotOperatorState(states.size(), 0)));
+ testHarness.waitForTaskCompletion();
+
+ verifyResultsDataDiscardingUponRestore(testHarness, task, (S) task.getOperator());
+ }
+
+ private StreamTaskState copyTaskState(StreamTaskState toCopy) throws IOException, ClassNotFoundException {
+ synchronized (toCopy.getFunctionState()) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(toCopy);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ return (StreamTaskState) ois.readObject();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a28a2d09/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java
new file mode 100644
index 0000000..72c71ad
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericAtLeastOnceSinkTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GenericAtLeastOnceSinkTest extends AtLeastOnceSinkTestBase<Tuple1<Integer>, GenericAtLeastOnceSinkTest.ListSink> {
+ @Override
+ protected ListSink createSink() throws Exception {
+ return new ListSink();
+ }
+
+ @Override
+ protected TupleTypeInfo<Tuple1<Integer>> createTypeInfo() {
+ return TupleTypeInfo.getBasicTupleTypeInfo(Integer.class);
+ }
+
+ @Override
+ protected Tuple1<Integer> generateValue(int counter, int checkpointID) {
+ return new Tuple1<>(counter);
+ }
+
+ @Override
+ protected void verifyResultsIdealCircumstances(
+ OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
+ OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
+
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Integer i : sink.values) {
+ list.remove(i);
+ }
+ Assert.assertTrue("The following ID's where not found in the result list: " + list.toString(), list.isEmpty());
+ Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 60), sink.values.size() == 60);
+ }
+
+ @Override
+ protected void verifyResultsDataPersistenceUponMissedNotify(
+ OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
+ OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
+
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Integer i : sink.values) {
+ list.remove(i);
+ }
+ Assert.assertTrue("The following ID's where not found in the result list: " + list.toString(), list.isEmpty());
+ Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 60), sink.values.size() == 60);
+ }
+
+ @Override
+ protected void verifyResultsDataDiscardingUponRestore(
+ OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
+ OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
+
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 20; x++) {
+ list.add(x);
+ }
+ for (int x = 41; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Integer i : sink.values) {
+ list.remove(i);
+ }
+ Assert.assertTrue("The following ID's where not found in the result list: " + list.toString(), list.isEmpty());
+ Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 40), sink.values.size() == 40);
+ }
+
+ /**
+ * Simple sink that stores all records in a public list.
+ */
+ public static class ListSink extends GenericAtLeastOnceSink<Tuple1<Integer>> {
+ public List<Integer> values = new ArrayList<>();
+
+ public ListSink() throws Exception {
+ super(new SimpleCommitter(), TypeExtractor.getForObject(new Tuple1<>(1)).createSerializer(new ExecutionConfig()), "job");
+ }
+
+ @Override
+ protected void sendValue(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
+ for (Tuple1<Integer> value : values) {
+ this.values.add(value.f0);
+ }
+ }
+ }
+
+ public static class SimpleCommitter extends CheckpointCommitter {
+ private List<Long> checkpoints;
+
+ @Override
+ public void open() throws Exception {
+ checkpoints = new ArrayList<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ checkpoints.clear();
+ checkpoints = null;
+ }
+
+ @Override
+ public void createResource() throws Exception {
+ }
+
+ @Override
+ public void commitCheckpoint(long checkpointID) {
+ checkpoints.add(checkpointID);
+ }
+
+ @Override
+ public boolean isCheckpointCommitted(long checkpointID) {
+ return checkpoints.contains(checkpointID);
+ }
+ }
+}