You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/10 12:40:59 UTC

flink git commit: [FLINK-7632] [docs] Overhaul Cassandra connector docs

Repository: flink
Updated Branches:
  refs/heads/master 83b6ba704 -> bbd8eee34


[FLINK-7632] [docs] Overhaul Cassandra connector docs

Refactor the current usage of on Cassandra Sink w/ more in-depth information.

Provides examples for Pojo and Java Tuple data types

This closes #4696.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbd8eee3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbd8eee3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbd8eee3

Branch: refs/heads/master
Commit: bbd8eee34f4e3b25f7bdb663288d423ea22127b9
Parents: 83b6ba7
Author: Michael Fong <mc...@gmail.com>
Authored: Sat Sep 16 23:23:30 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Oct 10 14:40:14 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/cassandra.md | 258 +++++++++++++++++++++++++---------
 1 file changed, 193 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbd8eee3/docs/dev/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 12b7ce7..cfb31b3 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -23,7 +23,15 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides sinks that writes data into a [Cassandra](https://cassandra.apache.org/) database.
+* This will be replaced by the TOC
+{:toc}
+
+
+This connector provides sinks that writes data into a [Apache Cassandra](https://cassandra.apache.org/) database.
+
+<!--
+  TODO: Perhaps worth mentioning current DataStax Java Driver version to match Cassandra versoin on user side.
+-->
 
 To use this connector, add the following dependency to your project:
 
@@ -35,28 +43,39 @@ To use this connector, add the following dependency to your project:
 </dependency>
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
 
-#### Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
 
-#### Cassandra Sink
+1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
+
+## Cassandra Sinks
+
+### Configurations
 
 Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
-This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
-
-*setQuery()* sets the query that is executed for every value the sink receives.
-*setHost()* sets the cassandra host/port to connect to. This method is intended for simple use-cases.
-*setClusterBuilder()* sets the cluster builder that is used to configure the connection to cassandra. The *setHost()* functionality can be subsumed with this method.
-*enableWriteAheadLog()* is an optional method, that allows exactly-once processing for non-deterministic algorithms.
+1. _setQuery(String query)_
+    * Sets the upsert query that is executed for every record the sink receives.
+    * The query is internally treated as CQL statement.
+    * __DO__ set the upsert query for processing __Tuple__ data type.
+    * __DO NOT__ set the query for processing __POJO__ data types.
+2. _setClusterBuilder()_
+    * Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
+3. _setHost(String host[, int port])_
+    * Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
+4. _enableWriteAheadLog([CheckpointCommitter committer])_
+    * An __optional__ setting
+    * Allows exactly-once processing for non-deterministic algorithms.
+5. _build()_
+    * Finalizes the configuration and constructs the CassandraSink instance.
+
+### Write-ahead Log
 
 A checkpoint committer stores additional information about completed checkpoints
 in some resource. This information is used to prevent a full replay of the last
@@ -64,93 +83,202 @@ completed checkpoint in case of a failure.
 You can use a `CassandraCommitter` to store these in a separate table in cassandra.
 Note that this table will NOT be cleaned up by Flink.
 
-*build()* finalizes the configuration and returns the CassandraSink.
-
 Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
 times without changing the result) and checkpointing is enabled. In case of a failure the failed
 checkpoint will be replayed completely.
 
 Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
 the replayed checkpoint may be completely different than the previous attempt, which may leave the
-database in an inconsitent state since part of the first attempt may already be written.
+database in an inconsistent state since part of the first attempt may already be written.
 The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt.
 Note that that enabling this feature will have an adverse impact on latency.
 
 <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
+
+More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
+
+## Examples
+
+The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
+
+<div class="codetabs" markdown="1">
+<div data-lang="CQL" markdown="1">
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+    word text,
+    count bigint,
+    PRIMARY KEY(word)
+    );
+{% endhighlight %}
+</div>
+</div>
+
+### Cassandra Sink Example for Streaming Tuple Data Type
+While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to the database. With the upsert query cached as `PreparedStatement`, each Tuple element is converted to parameters of the statement.
 
-#### Example
+For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-    @Override
-    public Cluster buildCluster(Cluster.Builder builder) {
-      return builder.addContactPoint("127.0.0.1").build();
-    }
-  })
-  .build();
+// get the execution environment
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream<String> text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream<Tuple2<String, Long>> result = text
+        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
+            @Override
+            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
+                // normalize and split the line
+                String[] words = value.toLowerCase().split("\\s");
+
+                // emit the pairs
+                for (String word : words) {
+                    //Do not accept empty word, since word is defined as primary key in C* table
+                    if (!word.isEmpty()) {
+                        out.collect(new Tuple2<String, Long>(word, 1L));
+                    }
+                }
+            }
+        })
+        .keyBy(0)
+        .timeWindow(Time.seconds(5))
+        .sum(1);
+
+CassandraSink.addSink(result)
+        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
+        .setHost("127.0.0.1")
+        .build();
 {% endhighlight %}
 </div>
+
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-    override def buildCluster(builder: Cluster.Builder): Cluster = {
-      builder.addContactPoint("127.0.0.1").build()
-    }
-  })
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+// get input data by connecting to the socket
+val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
+
+// parse the data, group it, window it, and aggregate the counts
+val result: DataStream[(String, Long)] = text
+  // split up the lines in pairs (2-tuples) containing: (word,1)
+  .flatMap(_.toLowerCase.split("\\s"))
+  .filter(_.nonEmpty)
+  .map((_, 1L))
+  // group by the tuple field "0" and sum up tuple field "1"
+  .keyBy(0)
+  .timeWindow(Time.seconds(5))
+  .sum(1)
+
+CassandraSink.addSink(result)
+  .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
+  .setHost("127.0.0.1")
   .build()
+
+result.print().setParallelism(1)
 {% endhighlight %}
 </div>
+
 </div>
 
-The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
-Flink automatically detects which type of input is used.
 
-Example for such a Pojo:
+### Cassandra Sink Example for Streaming POJO Data Type
+An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver `com.datastax.driver.mapping.Mapper` class.
+
+The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class.  For details of the mapping, please refer to CQL documentation on [Definition of Mapped Classes](http://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/creating/) and [CQL Data types](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream<String> text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream<WordCount> result = text
+        .flatMap(new FlatMapFunction<String, WordCount>() {
+            public void flatMap(String value, Collector<WordCount> out) {
+                // normalize and split the line
+                String[] words = value.toLowerCase().split("\\s");
+
+                // emit the pairs
+                for (String word : words) {
+                    if (!word.isEmpty()) {
+                        //Do not accept empty word, since word is defined as primary key in C* table
+                        out.collect(new WordCount(word, 1L));
+                    }
+                }
+            }
+        })
+        .keyBy("word")
+        .timeWindow(Time.seconds(5))
+
+        .reduce(new ReduceFunction<WordCount>() {
+            @Override
+            public WordCount reduce(WordCount a, WordCount b) {
+                return new WordCount(a.getWord(), a.getCount() + b.getCount());
+            }
+        });
+
+CassandraSink.addSink(result)
+        .setHost("127.0.0.1")
+        .build();
+
+
+@Table(keyspace = "example", name = "wordcount")
+public class WordCount {
+
+    @Column(name = "word")
+    private String word = "";
+
+    @Column(name = "count")
+    private long count = 0;
+
+    public WordCount() {}
+
+    public WordCount(String word, long count) {
+        this.setWord(word);
+        this.setCount(count);
+    }
 
-@Table(keyspace= "test", name = "mappersink")
-public class Pojo implements Serializable {
-
-	private static final long serialVersionUID = 1038054554690916991L;
-
-	@Column(name = "id")
-	private long id;
-	@Column(name = "value")
-	private String value;
-
-	public Pojo(long id, String value){
-		this.id = id;
-		this.value = value;
-	}
+    public String getWord() {
+        return word;
+    }
 
-	public long getId() {
-		return id;
-	}
+    public void setWord(String word) {
+        this.word = word;
+    }
 
-	public void setId(long id) {
-		this.id = id;
-	}
+    public long getCount() {
+        return count;
+    }
 
-	public String getValue() {
-		return value;
-	}
+    public void setCount(long count) {
+        this.count = count;
+    }
 
-	public void setValue(String value) {
-		this.value = value;
-	}
+    @Override
+    public String toString() {
+        return getWord() + " : " + getCount();
+    }
 }
 {% endhighlight %}
 </div>
+
 </div>
 
 {% top %}