You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/08/08 18:43:21 UTC

Writing on Cassandra

Hi,
I'm trying to integrate a Cassandra sink in my project but honestly I'm a
bit confused because I don't find any examples of use.

I want just to populate a table and query it on a single node instance of
Cassandra.

The only one link I found is: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html>  

but I have problems also with imports:

import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

not recognised by eclipse.

I have added dependency in pom.xml file:

<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-cassandra_2.10</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
</dependency>


Please, can you provide me some examples of use of Cassandra and clarify me
why Cassandra classes are not recognised?

Thanks in advance, 
Andrea



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Writing on Cassandra

Posted by Nico Kruber <ni...@data-artisans.com>.
If I see this correctly in the code, the CassandraSink is using the value of 
its input stream automatically, so in your case
Tuple2<String, Tuple6<String, String, Date, String, String, Double>>

What you want is it to use only
Tuple6<String, String, Date, String, String, Double>
without the first first part of Tuple2 (probably your key?).

A simple map function before adding the sink should to the trick:

DataStream<Tuple2<String, Tuple6<String, String, Date, String, String, 
Double>>> dataStream = //...
dataStream.map(new MapFunction<Tuple2<String, Tuple6<String, String, Date, 
String, String, Double>>, Tuple6<String, String, Date, String, String, 
Double>>() {
    @Override
    public Tuple6<String, String, Date, String, String, Double> 
map(Tuple2<String, Tuple6<String, String, Date, String, String, Double>> 
value) throws Exception {
        return value.f1;
    }
});


Nico

On Sunday, 13 August 2017 19:23:54 CEST AndreaKinn wrote:
> Ok, this is my situation:
> 
> I have a stream of Tuple2<String, Tuple6&lt;String, String, Date, String,
> String, Double>>
> 
> the cassandra code:
> 
> CassandraSink.addSink(stream)
> 		  .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
>                 + " (user, sensor, timestamp, json_ld, observed_value,
> value)"
>                 + " VALUES (?, ?, ?, ?, ?, ?);")
> 		  .setClusterBuilder(new ClusterBuilder() {
> 		    @Override
> 		    public Cluster buildCluster(Cluster.Builder builder) {
> 		      return builder.addContactPoint("127.0.0.1").build();
> 		    }
> 		  })
> 		  .build();
> 
> the values to insert in VALUES clause are exactly the values of Tuple6. How
> can I indicate that to my statement ?
> 
> 
> 
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing
> -on-Cassandra-tp14744p14867.html Sent from the Apache Flink User Mailing
> List archive. mailing list archive at Nabble.com.


Re: Writing on Cassandra

Posted by AndreaKinn <ki...@hotmail.it>.
Ok, this is my situation:

I have a stream of Tuple2<String, Tuple6&lt;String, String, Date, String,
String, Double>>

the cassandra code:

CassandraSink.addSink(stream)
		  .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
                + " (user, sensor, timestamp, json_ld, observed_value,
value)"
                + " VALUES (?, ?, ?, ?, ?, ?);")
		  .setClusterBuilder(new ClusterBuilder() {
		    @Override
		    public Cluster buildCluster(Cluster.Builder builder) {
		      return builder.addContactPoint("127.0.0.1").build();
		    }
		  })
		  .build();

the values to insert in VALUES clause are exactly the values of Tuple6. How
can I indicate that to my statement ?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14867.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Writing on Cassandra

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

In the doc section about Cassandra there is actually an example: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html>.

In a Flink Job you would therefore roughly do this:

StreamExecutionEnvironment env = ...:

DataStream<> input = env.addSource(...);

CassandraSink.addSink(input)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
  .setClusterBuilder(new ClusterBuilder() {
    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
      return builder.addContactPoint("127.0.0.1").build();
    }
  }).build();

env.execute();

Best,
Aljoscha

> On 8. Aug 2017, at 21:08, AndreaKinn <ki...@hotmail.it> wrote:
> 
> I probably solved import issue, but still need help to find some examples of
> use.
> Please let me know if someone has experience with Flink and Cassandra
> together
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14745.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: Writing on Cassandra

Posted by AndreaKinn <ki...@hotmail.it>.
I probably solved import issue, but still need help to find some examples of
use.
Please let me know if someone has experience with Flink and Cassandra
together



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14745.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.