You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/16 14:58:31 UTC
[2/4] flink git commit: [FLINK-3332] Address pull request review
comments
[FLINK-3332] Address pull request review comments
This closes #1771
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b63e19b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b63e19b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b63e19b0
Branch: refs/heads/master
Commit: b63e19b090cd9a1558db4c68c385e0ce947789e9
Parents: 5930622
Author: zentol <ch...@apache.org>
Authored: Wed Mar 9 18:20:43 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 16 16:58:04 2016 +0200
----------------------------------------------------------------------
.travis.yml | 2 +-
docs/apis/streaming/connectors/cassandra.md | 30 ++-
.../api/writer/ResultPartitionWriter.java | 2 +-
.../flink-connector-cassandra/pom.xml | 3 +-
.../cassandra/CassandraInputFormat.java | 4 +-
.../cassandra/CassandraOutputFormat.java | 4 +-
.../cassandra/CassandraAtLeastOnceSink.java | 91 --------
.../cassandra/CassandraCommitter.java | 34 +--
.../CassandraIdempotentExactlyOnceSink.java | 116 ----------
.../cassandra/CassandraPojoAtLeastOnceSink.java | 61 -----
.../connectors/cassandra/CassandraPojoSink.java | 61 +++++
.../connectors/cassandra/CassandraSink.java | 89 ++++---
.../connectors/cassandra/CassandraSinkBase.java | 94 ++++++++
.../CassandraTupleAtLeastOnceSink.java | 59 -----
.../cassandra/CassandraTupleSink.java | 59 +++++
.../cassandra/CassandraTupleWriteAheadSink.java | 138 +++++++++++
.../connectors/cassandra/ClusterBuilder.java | 12 +
.../cassandra/example/BatchExample.java | 6 +
.../cassandra/CassandraConnectorTest.java | 128 +++++++---
...ssandraIdempotentExactlyOnceSinkExample.java | 88 -------
.../CassandraPojoAtLeastOnceSinkExample.java | 57 -----
.../example/CassandraPojoSinkExample.java | 62 +++++
.../CassandraTupleAtLeastOnceSinkExample.java | 59 -----
.../example/CassandraTupleSinkExample.java | 62 +++++
.../CassandraTupleWriteAheadSinkExample.java | 96 ++++++++
.../src/test/resources/cassandra.yaml | 6 +-
.../src/test/resources/log4j-test.properties | 4 +-
.../operators/GenericAtLeastOnceSink.java | 197 ----------------
.../operators/GenericWriteAheadSink.java | 216 +++++++++++++++++
.../operators/AtLeastOnceSinkTestBase.java | 218 -----------------
.../operators/GenericAtLeastOnceSinkTest.java | 147 ------------
.../operators/GenericWriteAheadSinkTest.java | 153 ++++++++++++
.../operators/WriteAheadSinkTestBase.java | 231 +++++++++++++++++++
.../runtime/tasks/StreamTaskTestHarness.java | 7 +
34 files changed, 1403 insertions(+), 1193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index f6fdb78..57f4a52 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,7 +25,7 @@ matrix:
- jdk: "oraclejdk7" # this uploads the Hadoop 2 build to Maven and S3
env: PROFILE="-Dhadoop.version=2.3.0"
- jdk: "openjdk7" # this uploads the Hadoop 1 build to Maven and S3
- env: PROFILE="-Dhadoop.profile=1"
+ env: PROFILE="-Dhadoop.profile=1"
git:
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/docs/apis/streaming/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/cassandra.md b/docs/apis/streaming/connectors/cassandra.md
index 2497a8e..18732fb 100644
--- a/docs/apis/streaming/connectors/cassandra.md
+++ b/docs/apis/streaming/connectors/cassandra.md
@@ -51,22 +51,34 @@ This method returns a CassandraSinkBuilder, which offers methods to further conf
The following configuration methods can be used:
1. setQuery(String query)
-2. setConsistencyLevel(ConsistencyLevel level)
+2. setHost(String host[, int port])
3. setClusterBuilder(ClusterBuilder builder)
-4. setCheckpointCommitter(CheckpointCommitter committer)
+4. enableWriteAheadLog([CheckpointCommitter committer])
5. build()
setQuery() sets the query that is executed for every value the sink receives.
-setConsistencyLevel() sets the desired consistency level (AT_LEAST_ONCE / EXACTLY_ONCE).
-setClusterBuilder() sets the cluster builder that is used to configure the connection to cassandra.
-setCheckpointCommitter() is an optional method for EXACTLY_ONCE processing.
+setHost() sets the cassandra host/port to connect to. This method is intended for simple use-cases.
+setClusterBuilder() sets the cluster builder that is used to configure the connection to cassandra. The setHost() functionality can be subsumed with this method.
+enableWriteAheadLog() is an optional method, that allows exactly-once processing for non-deterministic algorithms.
A checkpoint committer stores additional information about completed checkpoints
-in some resource. You can use a `CassandraCommitter` to store these in a separate
-table in cassandra. Note that this table will NOT be cleaned up by Flink.
+in some resource. This information is used to prevent a full replay of the last
+completed checkpoint in case of a failure.
+You can use a `CassandraCommitter` to store these in a separate table in cassandra.
+Note that this table will NOT be cleaned up by Flink.
build() finalizes the configuration and returns the CassandraSink.
+Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
+times without changing the result) and checkpointing is enabled. In case of a failure the failed
+checkpoint will be replayed completely.
+
+Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
+the replayed checkpoint may be completely different than the previous attempt, which may leave the
+database in an inconsitent state since part of the first attempt may already be written.
+The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt.
+Note that that enabling this feature will have an adverse impact on latency.
+
Example:
<div class="codetabs" markdown="1">
@@ -74,8 +86,6 @@ Example:
{% highlight java %}
CassandraSink.addSink(input)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
- .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
- .setCheckpointCommitter(new CassandraCommitter())
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
@@ -89,8 +99,6 @@ CassandraSink.addSink(input)
{% highlight scala %}
CassandraSink.addSink(input)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
- .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
- .setCheckpointCommitter(new CassandraCommitter())
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index cfab34d..79c21c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
* The {@link ResultPartitionWriter} is the runtime API for producing results. It
* supports two kinds of data to be sent: buffers and events.
*/
-public class ResultPartitionWriter implements EventListener<TaskEvent> {
+public final class ResultPartitionWriter implements EventListener<TaskEvent> {
private final ResultPartition partition;
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
index 9d8b24d..bfd712b 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
@@ -49,6 +49,7 @@ under the License.
<configuration>
<reuseForks>true</reuseForks>
<forkCount>1</forkCount>
+ <argLine>-Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</plugin>
<plugin>
@@ -126,7 +127,7 @@ under the License.
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>16.0.1</version>
+ <version>${guava.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 0c027dc..6818288 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -117,13 +117,13 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
try {
session.close();
} catch (Exception e) {
- LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+ LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
}
try {
cluster.close();
} catch (Exception e) {
- LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+ LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 433cfba..116db89 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -111,13 +111,13 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
try {
session.close();
} catch (Exception e) {
- LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+ LOG.warn("Inputformat couldn't be closed.", e);
}
try {
cluster.close();
} catch (Exception e) {
- LOG.info("Inputformat couldn't be closed - " + e.getMessage());
+ LOG.warn("Inputformat couldn't be closed." , e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
deleted file mode 100644
index 602ec74..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * CassandraAtLeastOnceSink is the common abstract class of {@link CassandraPojoAtLeastOnceSink} and {@link CassandraTupleAtLeastOnceSink}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class CassandraAtLeastOnceSink<IN, V> extends RichSinkFunction<IN> {
- protected static final Logger LOG = LoggerFactory.getLogger(CassandraAtLeastOnceSink.class);
- protected transient Cluster cluster;
- protected transient Session session;
-
- protected transient Throwable exception = null;
- protected transient FutureCallback<V> callback;
-
- private final ClusterBuilder builder;
-
- protected CassandraAtLeastOnceSink(ClusterBuilder builder) {
- this.builder = builder;
- }
-
- @Override
- public void open(Configuration configuration) {
- this.callback = new FutureCallback<V>() {
- @Override
- public void onSuccess(V ignored) {
- }
-
- @Override
- public void onFailure(Throwable t) {
- exception = t;
- }
- };
- this.cluster = builder.getCluster();
- this.session = cluster.connect();
- }
-
- @Override
- public void invoke(IN value) throws Exception {
- if (exception != null) {
- throw new IOException("invoke() failed", exception);
- }
- ListenableFuture<V> result = send(value);
- Futures.addCallback(result, callback);
- }
-
- public abstract ListenableFuture<V> send(IN value);
-
- @Override
- public void close() {
- try {
- session.close();
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- cluster.close();
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index c5d47e7..caab188 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -34,18 +34,24 @@ public class CassandraCommitter extends CheckpointCommitter {
private transient Cluster cluster;
private transient Session session;
- private static final String KEYSPACE = "flink_auxiliary";
- private String TABLE = "checkpoints_";
+ private String keySpace = "flink_auxiliary";
+ private String table = "checkpoints_";
- private transient PreparedStatement deleteStatement;
private transient PreparedStatement updateStatement;
private transient PreparedStatement selectStatement;
+ private long lastCommittedCheckpointID = -1;
+
public CassandraCommitter(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
+ public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+ this(builder);
+ this.keySpace = keySpace;
+ }
+
/**
* Internally used to set the job ID after instantiation.
*
@@ -54,7 +60,7 @@ public class CassandraCommitter extends CheckpointCommitter {
*/
public void setJobId(String id) throws Exception {
super.setJobId(id);
- TABLE += id;
+ table += id;
}
/**
@@ -68,8 +74,8 @@ public class CassandraCommitter extends CheckpointCommitter {
cluster = builder.getCluster();
session = cluster.connect();
- session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE));
- session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE));
+ session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", keySpace));
+ session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table));
try {
session.close();
@@ -91,16 +97,15 @@ public class CassandraCommitter extends CheckpointCommitter {
cluster = builder.getCluster();
session = cluster.connect();
- deleteStatement = session.prepare(String.format("DELETE FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
- updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
- selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", KEYSPACE, TABLE, operatorId, subtaskId));
+ updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
+ selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId));
- session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", KEYSPACE, TABLE, operatorId, subtaskId));
+ session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ") IF NOT EXISTS;", keySpace, table, operatorId, subtaskId));
}
@Override
public void close() throws Exception {
- session.executeAsync(deleteStatement.bind());
+ this.lastCommittedCheckpointID = -1;
try {
session.close();
} catch (Exception e) {
@@ -116,11 +121,14 @@ public class CassandraCommitter extends CheckpointCommitter {
@Override
public void commitCheckpoint(long checkpointID) {
session.execute(updateStatement.bind(checkpointID));
+ this.lastCommittedCheckpointID = checkpointID;
}
@Override
public boolean isCheckpointCommitted(long checkpointID) {
- long lastId = session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
- return checkpointID <= lastId;
+ if (this.lastCommittedCheckpointID == -1) {
+ this.lastCommittedCheckpointID = session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
+ }
+ return checkpointID <= this.lastCommittedCheckpointID;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
deleted file mode 100644
index b9de9c4..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraIdempotentExactlyOnceSink.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
-
-/**
- * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing
- * mechanism and provides exactly-once guarantees for idempotent updates.
- * <p/>
- * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
- * checkpoint is completed.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraIdempotentExactlyOnceSink<IN extends Tuple> extends GenericAtLeastOnceSink<IN> {
- protected transient Cluster cluster;
- protected transient Session session;
-
- private final String insertQuery;
- private transient PreparedStatement preparedStatement;
-
- private transient Throwable exception = null;
- private transient FutureCallback<ResultSet> callback;
-
- private ClusterBuilder builder;
-
- protected CassandraIdempotentExactlyOnceSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception {
- super(committer, serializer, jobID);
- this.insertQuery = insertQuery;
- this.builder = builder;
- ClosureCleaner.clean(builder, true);
- }
-
- public void open() throws Exception {
- super.open();
- this.callback = new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet resultSet) {
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- exception = throwable;
- }
- };
- cluster = builder.getCluster();
- session = cluster.connect();
- preparedStatement = session.prepare(insertQuery);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- try {
- session.close();
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- cluster.close();
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-
- @Override
- protected void sendValue(Iterable<IN> values, long timestamp) throws Exception {
- //verify that no query failed until now
- if (exception != null) {
- throw new Exception(exception);
- }
- //set values for prepared statement
- for (IN value : values) {
- Object[] fields = new Object[value.getArity()];
- for (int x = 0; x < value.getArity(); x++) {
- fields[x] = value.getField(x);
- }
- //insert values and send to cassandra
- BoundStatement s = preparedStatement.bind(fields);
- s.setDefaultTimestamp(timestamp);
- ResultSetFuture result = session.executeAsync(s);
- if (result != null) {
- //add callback to detect errors
- Futures.addCallback(result, callback);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
deleted file mode 100644
index 45ac61d..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
- * it uses annotations from {@link com.datastax.driver.mapping}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraPojoAtLeastOnceSink<IN> extends CassandraAtLeastOnceSink<IN, Void> {
- protected Class<IN> clazz;
- protected transient Mapper<IN> mapper;
- protected transient MappingManager mappingManager;
-
- /**
- * The main constructor for creating CassandraPojoAtLeastOnceSink
- *
- * @param clazz Class<IN> instance
- */
- public CassandraPojoAtLeastOnceSink(Class<IN> clazz, ClusterBuilder builder) {
- super(builder);
- this.clazz = clazz;
- }
-
- @Override
- public void open(Configuration configuration) {
- super.open(configuration);
- try {
- this.mappingManager = new MappingManager(session);
- this.mapper = mappingManager.mapper(clazz);
- } catch (Exception e) {
- throw new RuntimeException("Cannot create CassandraPojoAtLeastOnceSink with input: " + clazz.getSimpleName(), e);
- }
- }
-
- @Override
- public ListenableFuture<Void> send(IN value) {
- return mapper.saveAsync(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
new file mode 100644
index 0000000..204a0f3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
+ * it uses annotations from {@link com.datastax.driver.mapping}.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+ protected Class<IN> clazz;
+ protected transient Mapper<IN> mapper;
+ protected transient MappingManager mappingManager;
+
+ /**
+ * The main constructor for creating CassandraPojoSink
+ *
+ * @param clazz Class<IN> instance
+ */
+ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
+ super(builder);
+ this.clazz = clazz;
+ }
+
+ @Override
+ public void open(Configuration configuration) {
+ super.open(configuration);
+ try {
+ this.mappingManager = new MappingManager(session);
+ this.mapper = mappingManager.mapper(clazz);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> send(IN value) {
+ return mapper.saveAsync(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index e7804f4..c01f392 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -30,15 +30,12 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import java.util.UUID;
-
/**
* This class wraps different Cassandra sink implementations to provide a common interface for all of them.
*
* @param <IN> input type
*/
public class CassandraSink<IN> {
- private static final String jobID = UUID.randomUUID().toString().replace("-", "_");
private final boolean useDataStreamSink;
private DataStreamSink<IN> sink1;
private SingleOutputStreamOperator<IN> sink2;
@@ -170,19 +167,14 @@ public class CassandraSink<IN> {
}
}
- public enum ConsistencyLevel {
- At_LEAST_ONCE,
- EXACTLY_ONCE
- }
-
public abstract static class CassandraSinkBuilder<IN> {
protected final DataStream<IN> input;
protected final TypeSerializer<IN> serializer;
protected final TypeInformation<IN> typeInfo;
- protected ConsistencyLevel consistency = ConsistencyLevel.At_LEAST_ONCE;
protected ClusterBuilder builder;
protected String query;
protected CheckpointCommitter committer;
+ protected boolean isWriteAheadLogEnabled;
public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
this.input = input;
@@ -192,7 +184,6 @@ public class CassandraSink<IN> {
/**
* Sets the query that is to be executed for every record.
- * This parameter is mandatory.
*
* @param query query to use
* @return this builder
@@ -202,12 +193,28 @@ public class CassandraSink<IN> {
return this;
}
+ /**
+ * Sets the cassandra host to connect to.
+ *
+ * @param host host to connect to
+ * @return this builder
+ */
public CassandraSinkBuilder<IN> setHost(String host) {
return setHost(host, 9042);
}
+ /**
+ * Sets the cassandra host/port to connect to.
+ *
+ * @param host host to connect to
+ * @param port port to connect to
+ * @return this builder
+ */
public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
- builder = new ClusterBuilder() {
+ if (this.builder != null) {
+ throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
+ }
+ this.builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(host).withPort(port).build();
@@ -217,39 +224,40 @@ public class CassandraSink<IN> {
}
/**
- * Specifies the desired consistency level for this sink. Different sink implementations may be used depending
- * on this parameter.
- * This parameter is mandatory.
+ * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
*
- * @param consistency desired consistency level
+ * @param builder ClusterBuilder to configure the connection to cassandra
* @return this builder
*/
- public CassandraSinkBuilder<IN> setConsistencyLevel(ConsistencyLevel consistency) {
- this.consistency = consistency;
+ public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
+ if (this.builder != null) {
+ throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
+ }
+ this.builder = builder;
return this;
}
/**
- * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
- * This field is mandatory.
+ * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
+ * idempotent updates.
*
- * @param builder ClusterBuilder to configure the connection to cassandra
* @return this builder
*/
- public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
- this.builder = builder;
+ public CassandraSinkBuilder<IN> enableWriteAheadLog() {
+ this.isWriteAheadLogEnabled = true;
return this;
}
/**
- * Sets the CheckpointCommitter for this sink. A CheckpointCommitter stores information about completed checkpoints
- * in a resource outside of Flink.
- * If the desired consistency level is EXACTLY_ONCE and this field is not set, a default committer will be used.
+ * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
+ * idempotent updates.
*
- * @param committer
+ * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external
+ * resource. By default this information is stored within a separate table within Cassandra.
* @return this builder
*/
- public CassandraSinkBuilder<IN> setCheckpointCommitter(CheckpointCommitter committer) {
+ public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
+ this.isWriteAheadLogEnabled = true;
this.committer = committer;
return this;
}
@@ -261,6 +269,15 @@ public class CassandraSink<IN> {
* @throws Exception
*/
public abstract CassandraSink<IN> build() throws Exception;
+
+ protected void sanityCheck() {
+ if (query == null || query.length() == 0) {
+ throw new IllegalArgumentException("Query must not be null or empty.");
+ }
+ if (builder == null) {
+ throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
+ }
+ }
}
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
@@ -270,12 +287,13 @@ public class CassandraSink<IN> {
@Override
public CassandraSink<IN> build() throws Exception {
- if (consistency == ConsistencyLevel.EXACTLY_ONCE) {
+ sanityCheck();
+ if (isWriteAheadLogEnabled) {
return committer == null
- ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, new CassandraCommitter(builder))))
- : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraIdempotentExactlyOnceSink<>(query, serializer, builder, jobID, committer)));
+ ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
+ : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
} else {
- return new CassandraSink<>(input.addSink(new CassandraTupleAtLeastOnceSink<IN>(query, builder)).name("Cassandra Sink"));
+ return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
}
}
}
@@ -287,13 +305,12 @@ public class CassandraSink<IN> {
@Override
public CassandraSink<IN> build() throws Exception {
- if (consistency == ConsistencyLevel.EXACTLY_ONCE) {
+ sanityCheck();
+ if (isWriteAheadLogEnabled) {
throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
+ } else {
+ return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
}
- if (consistency == ConsistencyLevel.At_LEAST_ONCE) {
- return new CassandraSink<>(input.addSink(new CassandraPojoAtLeastOnceSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
- }
- throw new IllegalArgumentException("No consistency level was specified.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
new file mode 100644
index 0000000..c823f5b
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
+ protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
+ protected transient Cluster cluster;
+ protected transient Session session;
+
+ protected transient Throwable exception = null;
+ protected transient FutureCallback<V> callback;
+
+ private final ClusterBuilder builder;
+
+ protected CassandraSinkBase(ClusterBuilder builder) {
+ this.builder = builder;
+ ClosureCleaner.clean(builder, true);
+ }
+
+ @Override
+ public void open(Configuration configuration) {
+ this.callback = new FutureCallback<V>() {
+ @Override
+ public void onSuccess(V ignored) {
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ exception = t;
+ LOG.error("Error while sending value.", t);
+ }
+ };
+ this.cluster = builder.getCluster();
+ this.session = cluster.connect();
+ }
+
+ @Override
+ public void invoke(IN value) throws Exception {
+ if (exception != null) {
+ throw new IOException("invoke() failed", exception);
+ }
+ ListenableFuture<V> result = send(value);
+ Futures.addCallback(result, callback);
+ }
+
+ public abstract ListenableFuture<V> send(IN value);
+
+ @Override
+ public void close() {
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
deleted file mode 100644
index 9d6daea..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
- */
-public class CassandraTupleAtLeastOnceSink<IN extends Tuple> extends CassandraAtLeastOnceSink<IN, ResultSet> {
- private final String insertQuery;
- private transient PreparedStatement ps;
-
- public CassandraTupleAtLeastOnceSink(String insertQuery, ClusterBuilder builder) {
- super(builder);
- this.insertQuery = insertQuery;
- }
-
- @Override
- public void open(Configuration configuration) {
- super.open(configuration);
- this.ps = session.prepare(insertQuery);
- }
-
- @Override
- public ListenableFuture<ResultSet> send(IN value) {
- Object[] fields = extract(value);
- return session.executeAsync(ps.bind(fields));
- }
-
- private Object[] extract(IN record) {
- Object[] al = new Object[record.getArity()];
- for (int i = 0; i < record.getArity(); i++) {
- al[i] = record.getField(i);
- }
- return al;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
new file mode 100644
index 0000000..0a9ef06
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Flink Sink to save data into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
+ */
+public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
+ private final String insertQuery;
+ private transient PreparedStatement ps;
+
+ public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+ super(builder);
+ this.insertQuery = insertQuery;
+ }
+
+ @Override
+ public void open(Configuration configuration) {
+ super.open(configuration);
+ this.ps = session.prepare(insertQuery);
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> send(IN value) {
+ Object[] fields = extract(value);
+ return session.executeAsync(ps.bind(fields));
+ }
+
+ private Object[] extract(IN record) {
+ Object[] al = new Object[record.getArity()];
+ for (int i = 0; i < record.getArity(); i++) {
+ al[i] = record.getField(i);
+ }
+ return al;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
new file mode 100644
index 0000000..f784647
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
+ protected transient Cluster cluster;
+ protected transient Session session;
+
+ private final String insertQuery;
+ private transient PreparedStatement preparedStatement;
+
+ private transient Throwable exception = null;
+ private transient FutureCallback<ResultSet> callback;
+
+ private ClusterBuilder builder;
+
+ private int updatesSent = 0;
+ private AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+ private transient Object[] fields;
+
+ protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
+ super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
+ this.insertQuery = insertQuery;
+ this.builder = builder;
+ ClosureCleaner.clean(builder, true);
+ }
+
+ public void open() throws Exception {
+ super.open();
+ if (!getRuntimeContext().isCheckpointingEnabled()) {
+ throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
+ }
+ this.callback = new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet resultSet) {
+ updatesConfirmed.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ exception = throwable;
+ LOG.error("Error while sending value.", throwable);
+ }
+ };
+ cluster = builder.getCluster();
+ session = cluster.connect();
+ preparedStatement = session.prepare(insertQuery);
+
+ fields = new Object[((TupleSerializer<IN>) serializer).getArity()];
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
+ //verify that no query failed until now
+ if (exception != null) {
+ throw new Exception(exception);
+ }
+ //set values for prepared statement
+ for (IN value : values) {
+ for (int x = 0; x < value.getArity(); x++) {
+ fields[x] = value.getField(x);
+ }
+ //insert values and send to cassandra
+ BoundStatement s = preparedStatement.bind(fields);
+ s.setDefaultTimestamp(timestamp);
+ ResultSetFuture result = session.executeAsync(s);
+ updatesSent++;
+ if (result != null) {
+ //add callback to detect errors
+ Futures.addCallback(result, callback);
+ }
+ }
+ try {
+ while (updatesSent != updatesConfirmed.get()) {
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) {
+ }
+ updatesSent = 0;
+ updatesConfirmed.set(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
index d3db435..01458f4 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -21,11 +21,23 @@ import com.datastax.driver.core.Cluster;
import java.io.Serializable;
+/**
+ * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment.
+ * The cluster represents the connection that will be established to Cassandra.
+ */
public abstract class ClusterBuilder implements Serializable {
public Cluster getCluster() {
return buildCluster(Cluster.builder());
}
+ /**
+ * Configures the connection to Cassandra.
+ * The configuration is done by calling methods on the builder object
+ * and finalizing the configuration with build().
+ *
+ * @param builder connection builder
+ * @return configured connection
+ */
protected abstract Cluster buildCluster(Cluster.Builder builder);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
index 396444a..e66b8b3 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
@@ -29,6 +29,12 @@ import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import java.util.ArrayList;
+/**
+ * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));
+ */
public class BatchExample {
private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);";
private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;";
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index a4a23a1..83bb37a 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
@@ -26,25 +28,33 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.AtLeastOnceSinkTestBase;
+import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
@@ -54,7 +64,11 @@ import java.util.ArrayList;
import java.util.Scanner;
import java.util.UUID;
-public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<String, Integer, Integer>, CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>>> {
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore("javax.management.*")
+public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static File tmpDir;
private static final boolean EMBEDDED = true;
@@ -62,7 +76,11 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
private transient static ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
+ return builder
+ .addContactPoint("127.0.0.1")
+ .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+ .withoutJMXReporting()
+ .withoutMetrics().build();
}
};
private static Cluster cluster;
@@ -102,7 +120,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
public static void startCassandra() throws IOException {
//generate temporary files
tmpDir = CommonTestUtils.createTempDirectory();
- ClassLoader classLoader = CassandraIdempotentExactlyOnceSink.class.getClassLoader();
+ ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
File file = new File(classLoader.getResource("cassandra.yaml").getFile());
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
tmp.createNewFile();
@@ -133,13 +151,24 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
} catch (InterruptedException e) { //give cassandra a few seconds to start up
}
- cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
+ cluster = builder.getCluster();
session = cluster.connect();
session.execute(CREATE_KEYSPACE_QUERY);
session.execute(CREATE_TABLE_QUERY);
}
+ @Before
+ public void checkIfIgnore() {
+ String runtime = System.getProperty("java.runtime.name");
+ String version = System.getProperty("java.runtime.version");
+ LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
+ // The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
+ // Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
+ // OpenJDK is "OpenJDK Runtime Environment"
+ Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
+ }
+
@After
public void deleteSchema() throws Exception {
session.executeAsync(CLEAR_TABLE_QUERY);
@@ -158,23 +187,12 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
//=====Exactly-Once=================================================================================================
@Override
- protected CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> createSink() {
- try {
- return new CassandraIdempotentExactlyOnceSink<>(
- INSERT_DATA_QUERY,
- TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
- new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- },
- "testJob",
- new CassandraCommitter(builder));
- } catch (Exception e) {
- Assert.fail("Failure while initializing sink: " + e.getMessage());
- return null;
- }
+ protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
+ return new CassandraTupleWriteAheadSink<>(
+ INSERT_DATA_QUERY,
+ TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
+ builder,
+ new CassandraCommitter(builder));
}
@Override
@@ -191,7 +209,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
protected void verifyResultsIdealCircumstances(
OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
- CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+ CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
ResultSet result = session.execute(SELECT_DATA_QUERY);
ArrayList<Integer> list = new ArrayList<>();
@@ -209,7 +227,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
protected void verifyResultsDataPersistenceUponMissedNotify(
OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
- CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+ CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
ResultSet result = session.execute(SELECT_DATA_QUERY);
ArrayList<Integer> list = new ArrayList<>();
@@ -227,7 +245,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
protected void verifyResultsDataDiscardingUponRestore(
OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
- CassandraIdempotentExactlyOnceSink<Tuple3<String, Integer, Integer>> sink) {
+ CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
ResultSet result = session.execute(SELECT_DATA_QUERY);
ArrayList<Integer> list = new ArrayList<>();
@@ -244,6 +262,60 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
}
+ @Test
+ public void testCassandraCommitter() throws Exception {
+ CassandraCommitter cc1 = new CassandraCommitter(builder);
+ cc1.setJobId("job");
+ cc1.setOperatorId("operator");
+ cc1.setOperatorSubtaskId(0);
+
+ CassandraCommitter cc2 = new CassandraCommitter(builder);
+ cc2.setJobId("job");
+ cc2.setOperatorId("operator");
+ cc2.setOperatorSubtaskId(1);
+
+ CassandraCommitter cc3 = new CassandraCommitter(builder);
+ cc3.setJobId("job");
+ cc3.setOperatorId("operator1");
+ cc3.setOperatorSubtaskId(0);
+
+ cc1.createResource();
+
+ cc1.open();
+ cc2.open();
+ cc3.open();
+
+ Assert.assertFalse(cc1.isCheckpointCommitted(1));
+ Assert.assertFalse(cc2.isCheckpointCommitted(1));
+ Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+ cc1.commitCheckpoint(1);
+ Assert.assertTrue(cc1.isCheckpointCommitted(1));
+ //verify that other sub-tasks aren't affected
+ Assert.assertFalse(cc2.isCheckpointCommitted(1));
+ //verify that other tasks aren't affected
+ Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+ Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+ cc1.close();
+ cc2.close();
+ cc3.close();
+
+ cc1 = new CassandraCommitter(builder);
+ cc1.setJobId("job");
+ cc1.setOperatorId("operator");
+ cc1.setOperatorSubtaskId(0);
+
+ cc1.open();
+
+ //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
+ Assert.assertTrue(cc1.isCheckpointCommitted(1));
+ Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+ cc1.close();
+ }
+
//=====At-Least-Once================================================================================================
@Test
public void testCassandraTupleAtLeastOnceSink() throws Exception {
@@ -251,7 +323,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
env.setParallelism(1);
DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
- source.addSink(new CassandraTupleAtLeastOnceSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+ source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
env.execute();
@@ -287,7 +359,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
}
});
- source.addSink(new CassandraPojoAtLeastOnceSink<>(Pojo.class, builder));
+ source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
env.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
deleted file mode 100644
index 5a0b299..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraIdempotentExactlyOnceSinkExample.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.UUID;
-
-public class CassandraIdempotentExactlyOnceSinkExample {
- public static void main(String[] args) throws Exception {
-
- class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
- private int counter = 0;
- private boolean stop = false;
-
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- while (!stop) {
- Thread.sleep(50);
- ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1));
- counter++;
- if (counter == 100) {
- stop = true;
- }
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return counter;
- }
-
- @Override
- public void restoreState(Integer state) throws Exception {
- this.counter = state;
- }
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
- env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
-
- CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
- .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
- .setConsistencyLevel(CassandraSink.ConsistencyLevel.EXACTLY_ONCE)
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- public Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
-
- sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
deleted file mode 100644
index 8c116c7..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-public class CassandraPojoAtLeastOnceSinkExample {
- private static final ArrayList<Message> messages = new ArrayList<>(20);
-
- static {
- for (long i = 0; i < 20; i++) {
- messages.add(new Message("cassandra-" + i));
- }
- }
-
- /*
- * create table: "CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY);"
- */
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Message> source = env.fromCollection(messages);
-
- CassandraSink.addSink(source)
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
-
- env.execute("Cassandra Sink example");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
new file mode 100644
index 0000000..e1bcea9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API.
+ *
+ * Pojo's have to be annotated with datastax annotations to work with this sink.
+ *
+ * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
+ */
+public class CassandraPojoSinkExample {
+ private static final ArrayList<Message> messages = new ArrayList<>(20);
+
+ static {
+ for (long i = 0; i < 20; i++) {
+ messages.add(new Message("cassandra-" + i));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Message> source = env.fromCollection(messages);
+
+ CassandraSink.addSink(source)
+ .setClusterBuilder(new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ })
+ .build();
+
+ env.execute("Cassandra Sink example");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
deleted file mode 100644
index 25eb5c2..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-public class CassandraTupleAtLeastOnceSinkExample {
- private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)";
- private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
-
- static {
- for (int i = 0; i < 20; i++) {
- collection.add(new Tuple2<>("cassandra-" + i, i));
- }
- }
-
- /*
- * table script: "CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)"
- */
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
-
- CassandraSink.addSink(source)
- .setQuery(INSERT)
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
-
- env.execute("WriteTupleIntoCassandra");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b63e19b0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
new file mode 100644
index 0000000..c6345df
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API.
+ *
+ * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)
+ */
+public class CassandraTupleSinkExample {
+ private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)";
+ private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
+
+ static {
+ for (int i = 0; i < 20; i++) {
+ collection.add(new Tuple2<>("cassandra-" + i, i));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
+
+ CassandraSink.addSink(source)
+ .setQuery(INSERT)
+ .setClusterBuilder(new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Builder builder) {
+ return builder.addContactPoint("127.0.0.1").build();
+ }
+ })
+ .build();
+
+ env.execute("WriteTupleIntoCassandra");
+ }
+}