You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mcfongtw <gi...@git.apache.org> on 2017/09/21 11:31:17 UTC

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

GitHub user mcfongtw opened a pull request:

    https://github.com/apache/flink/pull/4696

    [FLINK-7632] [document] Overhaul on Cassandra connector doc

    ## What is the purpose of the change
    Refactor Cassandra connector documentation by providing 
    - in-depth information about Cassandra Pojo sink and Cassandra Tuple sink.
    - meaningful examples to show distinction between streaming over Java Tuple and Pojo data types.
    
    ## Brief change log
    - Refactor the current usage of on Cassandra Sink w/ more in-depth information.
    - Provides examples for Pojo and Java Tuple data types
    
    
    ## Verifying this change
    Have tested w/ local doc server (w/ docker setup)
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive):no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
      - Does this pull request introduce a new feature? no
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mcfongtw/flink FLINK-7632

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4696
    
----

----


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140517696
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    --- End diff --
    
    we generally try to keep the java&scala examples self-contained, even if it requires duplicated information.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140620261
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
    --- End diff --
    
    Very true. Thanks.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140620296
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    --- End diff --
    
    Okay. 


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140517986
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    +    .setQuery("INSERT INTO test.wordcount(word, count) values (?, ?);")
    +    .setClusterBuilder(new ClusterBuilder() {
    +        @Override
    +        def buildCluster(builder: Cluster.Builder): Cluster = {
    +            builder.addContactPoint("127.0.0.1").build()
    +        }
    +    })
    +    .build()
     {% endhighlight %}
     </div>
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    +
    +Please note that if the upsert query was set, an `IllegalArgumentException` would be thrown with the following error message `Specifying a query is not allowed when using a Pojo-Stream as input.`
    +
    +For each CQL defined data type for columns, please refer to [CQL Documentation](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
    --- End diff --
    
    the first part of the sentence doesn't quite make sense to me, the wording is a bit odd.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141615691
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    +    .setQuery("INSERT INTO test.wordcount(word, count) values (?, ?);")
    +    .setClusterBuilder(new ClusterBuilder() {
    +        @Override
    +        def buildCluster(builder: Cluster.Builder): Cluster = {
    +            builder.addContactPoint("127.0.0.1").build()
    +        }
    +    })
    +    .build()
     {% endhighlight %}
     </div>
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    +
    +Please note that if the upsert query was set, an `IllegalArgumentException` would be thrown with the following error message `Specifying a query is not allowed when using a Pojo-Stream as input.`
    +
    +For each CQL defined data type for columns, please refer to [CQL Documentation](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
     
    -@Table(keyspace= "test", name = "mappersink")
    -public class Pojo implements Serializable {
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<WordCount> result = text
     
    -	private static final long serialVersionUID = 1038054554690916991L;
    +        .flatMap(new FlatMapFunction<String, WordCount>() {
    +            @Override
    +            public void flatMap(String value, Collector<WordCount> out) {
    +                for (String word : value.split("\\s")) {
    +                    if (!word.isEmpty()) {
    +                        //Do not accept empty word, since word is defined as primary key in C* table
    +                        out.collect(new WordCount(word, 1L));
    +                    }
    +                }
    +            }
    +        })
     
    -	@Column(name = "id")
    -	private long id;
    -	@Column(name = "value")
    -	private String value;
    +        .keyBy("word")
    --- End diff --
    
    ah nevermind, i didn't saw that the Tuple example does a keyBy too. 


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140620229
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    --- End diff --
    
    Good eye. Thanks


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141785837
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    +    * internally treated as CQL prepared statement, in which parameters could be shared or anonymous.
    --- End diff --
    
    It is true that we don't need to get to that level of details. I could rephrase to what you suggested. 


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140620211
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    +    .setQuery("INSERT INTO test.wordcount(word, count) values (?, ?);")
    +    .setClusterBuilder(new ClusterBuilder() {
    +        @Override
    +        def buildCluster(builder: Cluster.Builder): Cluster = {
    +            builder.addContactPoint("127.0.0.1").build()
    +        }
    +    })
    +    .build()
     {% endhighlight %}
     </div>
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    +
    +Please note that if the upsert query was set, an `IllegalArgumentException` would be thrown with the following error message `Specifying a query is not allowed when using a Pojo-Stream as input.`
    +
    +For each CQL defined data type for columns, please refer to [CQL Documentation](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
     
    -@Table(keyspace= "test", name = "mappersink")
    -public class Pojo implements Serializable {
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<WordCount> result = text
     
    -	private static final long serialVersionUID = 1038054554690916991L;
    +        .flatMap(new FlatMapFunction<String, WordCount>() {
    +            @Override
    +            public void flatMap(String value, Collector<WordCount> out) {
    +                for (String word : value.split("\\s")) {
    +                    if (!word.isEmpty()) {
    +                        //Do not accept empty word, since word is defined as primary key in C* table
    +                        out.collect(new WordCount(word, 1L));
    +                    }
    +                }
    +            }
    +        })
     
    -	@Column(name = "id")
    -	private long id;
    -	@Column(name = "value")
    -	private String value;
    +        .keyBy("word")
    --- End diff --
    
    Thanks. Do you refer difference as for the string splitting or overloading keyBy methods? I will unify string splitting part. As for the keyBy, perhaps user could get to know that it is valid to use keyBy("field name") corresponding to an accessible field in the Pojo class. 


---

[GitHub] flink issue #4696: [FLINK-7632] [document] Overhaul on Cassandra connector d...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4696
  
    Looks really god now, merging.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141611894
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    +    * internally treated as CQL prepared statement, in which parameters could be shared or anonymous.
    --- End diff --
    
    -> The query is internally treated as a CQL ...


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140517823
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    +    .setQuery("INSERT INTO test.wordcount(word, count) values (?, ?);")
    +    .setClusterBuilder(new ClusterBuilder() {
    +        @Override
    +        def buildCluster(builder: Cluster.Builder): Cluster = {
    +            builder.addContactPoint("127.0.0.1").build()
    +        }
    +    })
    +    .build()
     {% endhighlight %}
     </div>
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    +
    +Please note that if the upsert query was set, an `IllegalArgumentException` would be thrown with the following error message `Specifying a query is not allowed when using a Pojo-Stream as input.`
    --- End diff --
    
    same as bove


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140518123
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    +    .setQuery("INSERT INTO test.wordcount(word, count) values (?, ?);")
    +    .setClusterBuilder(new ClusterBuilder() {
    +        @Override
    +        def buildCluster(builder: Cluster.Builder): Cluster = {
    +            builder.addContactPoint("127.0.0.1").build()
    +        }
    +    })
    +    .build()
     {% endhighlight %}
     </div>
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    +
    +Please note that if the upsert query was set, an `IllegalArgumentException` would be thrown with the following error message `Specifying a query is not allowed when using a Pojo-Stream as input.`
    +
    +For each CQL defined data type for columns, please refer to [CQL Documentation](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
     
    -@Table(keyspace= "test", name = "mappersink")
    -public class Pojo implements Serializable {
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<WordCount> result = text
     
    -	private static final long serialVersionUID = 1038054554690916991L;
    +        .flatMap(new FlatMapFunction<String, WordCount>() {
    +            @Override
    +            public void flatMap(String value, Collector<WordCount> out) {
    +                for (String word : value.split("\\s")) {
    +                    if (!word.isEmpty()) {
    +                        //Do not accept empty word, since word is defined as primary key in C* table
    +                        out.collect(new WordCount(word, 1L));
    +                    }
    +                }
    +            }
    +        })
     
    -	@Column(name = "id")
    -	private long id;
    -	@Column(name = "value")
    -	private String value;
    +        .keyBy("word")
    --- End diff --
    
    why is this example different than the previous one about tuples?


---

[GitHub] flink issue #4696: [FLINK-7632] [document] Overhaul on Cassandra connector d...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on the issue:

    https://github.com/apache/flink/pull/4696
  
    @zentol, Could you take another look? Thank you.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141612126
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    +    * internally treated as CQL prepared statement, in which parameters could be shared or anonymous.
    +    * __DO__ set the upsert query for processing __Tuple__ data type
    +    * __DO NOT__ set the query for processing __POJO__ data type.
    +2. _setClusterBuilder()_
    +    * sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
    --- End diff --
    
    sets -> Sets


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140517182
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    --- End diff --
    
    I would leave this out. Its one of those things that are easily out-dated, and don't provide immediate value when reading the docs for the first time. If a user stumbles upon this the error message should be self-explanatory; if it isn't we should change it accordingly.


---

[GitHub] flink issue #4696: [FLINK-7632] [document] Overhaul on Cassandra connector d...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on the issue:

    https://github.com/apache/flink/pull/4696
  
    Hi, @zentol , could you take another look? Thank you. 


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141611240
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
    --- End diff --
    
    This line is a bit weird, i Would just remove it.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141612087
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    +    * internally treated as CQL prepared statement, in which parameters could be shared or anonymous.
    +    * __DO__ set the upsert query for processing __Tuple__ data type
    +    * __DO NOT__ set the query for processing __POJO__ data type.
    --- End diff --
    
    type -> types


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4696


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141614919
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,77 +96,189 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
    +
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
     
    -#### Example
    +## Examples
    +
    +The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Tuple Data Type
    +While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\s");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
     {% endhighlight %}
     </div>
    +
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +// get input data by connecting to the socket
    +val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
    +
    +// parse the data, group it, window it, and aggregate the counts
    +val result: DataStream[(String, Long)] = text
    +  // split up the lines in pairs (2-tuples) containing: (word,1)
    +  .flatMap(_.toLowerCase.split("\\s"))
    +  .filter(_.nonEmpty)
    +  .map((_, 1L))
    +  // group by the tuple field "0" and sum up tuple field "1"
    +  .keyBy(0)
    +  .timeWindow(Time.seconds(5))
    +  .sum(1)
    +
    +CassandraSink.addSink(result)
    +  .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +  .setHost("127.0.0.1")
       .build()
    +
    +result.print().setParallelism(1)
     {% endhighlight %}
     </div>
    +
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    --- End diff --
    
    -> to annotate the class **as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver `com.datastax.driver.mapping.Mapper` class.**


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140620313
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +env.enableCheckpointing(5000); // checkpoint every 5000 msecs
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    override def buildCluster(builder: Cluster.Builder): Cluster = {
    -      builder.addContactPoint("127.0.0.1").build()
    -    }
    -  })
    -  .build()
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Examples
    +
    +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Java Tuple Data Type
    +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
    +
    +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.`
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\W+");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    +
    +CassandraSink.addSink(result)
    +        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
    +        .setHost("127.0.0.1")
    +        .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +CassandraSink.addSink(input.javaStream)
    +    .setQuery("INSERT INTO test.wordcount(word, count) values (?, ?);")
    +    .setClusterBuilder(new ClusterBuilder() {
    +        @Override
    +        def buildCluster(builder: Cluster.Builder): Cluster = {
    +            builder.addContactPoint("127.0.0.1").build()
    +        }
    +    })
    +    .build()
     {% endhighlight %}
     </div>
     </div>
     
    -The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
    -Flink automatically detects which type of input is used.
     
    -Example for such a Pojo:
    +### Cassandra Sink Example for Streaming POJO Data Type
    +An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as Cassandra connector internally maps each field of this entity to an associated column of the desginated Table using `com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
    +
    +Please note that if the upsert query was set, an `IllegalArgumentException` would be thrown with the following error message `Specifying a query is not allowed when using a Pojo-Stream as input.`
    +
    +For each CQL defined data type for columns, please refer to [CQL Documentation](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
    --- End diff --
    
    Will rephrase. Thank you.


---

[GitHub] flink issue #4696: [FLINK-7632] [document] Overhaul on Cassandra connector d...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4696
  
    Nice improvement to the docs, had a few comments.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141611417
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    --- End diff --
    
    sink -> sinks; remove by (while we're at it)


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141612269
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    +    * internally treated as CQL prepared statement, in which parameters could be shared or anonymous.
    +    * __DO__ set the upsert query for processing __Tuple__ data type
    +    * __DO NOT__ set the query for processing __POJO__ data type.
    +2. _setClusterBuilder()_
    +    * sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
    +3. _setHost(String host[, int port])_
    +    * simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
    +4. _enableWriteAheadLog([CheckpointCommitter committer])_
    +    * an __optional__ setting
    +    * allows exactly-once processing for non-deterministic algorithms.
    +5. _build()_
    +    * finalizes the configuration and constructs the CassandraSink instance.
    --- End diff --
    
    finalizes -> Finalizes


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141611782
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    --- End diff --
    
    sets -> Sets


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141611926
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -35,37 +43,47 @@ To use this connector, add the following dependency to your project:
     </dependency>
     {% endhighlight %}
     
    -Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
    +
    +## Installing Apache Cassandra
    +There are multiple ways to bring up a Cassandra instance on local machine:
    +
    +1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
    +2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
     
    -#### Installing Apache Cassandra
    -Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
    +## Cassandra Sink
    +Flink Cassandra connector currently supports Apache Cassandra as a data sink.
     
    -#### Cassandra Sink
    +### Configurations
     
     Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
    -This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
    +This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
     
     The following configuration methods can be used:
     
    -1. setQuery(String query)
    -2. setHost(String host[, int port])
    -3. setClusterBuilder(ClusterBuilder builder)
    -4. enableWriteAheadLog([CheckpointCommitter committer])
    -5. build()
    +1. _setQuery(String query)_
    +    * sets the upsert query that is executed for every record the sink receives.
    +    * internally treated as CQL prepared statement, in which parameters could be shared or anonymous.
    +    * __DO__ set the upsert query for processing __Tuple__ data type
    --- End diff --
    
    missing period.


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141613315
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,77 +96,189 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
    +
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
     
    -#### Example
    +## Examples
    +
    +The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Tuple Data Type
    +While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    --- End diff --
    
    -> back to **the** database.
    
    I would reword the second part of second sentence to something like ", each Tuple element is converted to parameters of the statement."


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140516135
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    --- End diff --
    
    theres a double space after mutations


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r141614161
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,77 +96,189 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
    +
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
     
    -#### Example
    +## Examples
    +
    +The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Tuple data types respectively.
    +
    +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="CQL" markdown="1">
    +{% highlight sql %}
    +CREATE KEYSPACE IF NOT EXISTS example
    +    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    +CREATE TABLE IF NOT EXISTS example.wordcount (
    +    word text,
    +    count bigint,
    +    PRIMARY KEY(word)
    +    );
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Cassandra Sink Example for Streaming Tuple Data Type
    +While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement.
    +
    +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -CassandraSink.addSink(input)
    -  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
    -  .setClusterBuilder(new ClusterBuilder() {
    -    @Override
    -    public Cluster buildCluster(Cluster.Builder builder) {
    -      return builder.addContactPoint("127.0.0.1").build();
    -    }
    -  })
    -  .build();
    +// get the execution environment
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +// get input data by connecting to the socket
    +DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    +
    +// parse the data, group it, window it, and aggregate the counts
    +DataStream<Tuple2<String, Long>> result = text
    +        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    +            @Override
    +            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    +                // normalize and split the line
    +                String[] words = value.toLowerCase().split("\\s");
    +
    +                // emit the pairs
    +                for (String word : words) {
    +                    //Do not accept empty word, since word is defined as primary key in C* table
    +                    if (!word.isEmpty()) {
    +                        out.collect(new Tuple2<String, Long>(word, 1L));
    +                    }
    +                }
    +            }
    +        })
    +        .keyBy(0)
    +        .timeWindow(Time.seconds(5))
    +        .sum(1)
    +        ;
    --- End diff --
    
    move to previous line


---

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4696#discussion_r140516418
  
    --- Diff: docs/dev/connectors/cassandra.md ---
    @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency.
     
     <p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
     
    +### Checkpointing and Fault Tolerance
    +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
     
    -#### Example
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:However, current Cassandra Sink implementation does not flush the pending mutations  before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. </p>
    +
    +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
    +
    +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment:
    --- End diff --
    
    imo this is a bit redundant, we are already linking to the checkpoint docs after all.


---