You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/08/08 18:43:21 UTC
Writing on Cassandra
Hi,
I'm trying to integrate a Cassandra sink in my project but honestly I'm a
bit confused because I don't find any examples of use.
I want just to populate a table and query it on a single node instance of
Cassandra.
The only one link I found is:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html>
but I have problems also with imports:
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
not recognised by eclipse.
I have added dependency in pom.xml file:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.10</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Please, can you provide me some examples of use of Cassandra and clarify me
why Cassandra classes are not recognised?
Thanks in advance,
Andrea
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Writing on Cassandra
Posted by Nico Kruber <ni...@data-artisans.com>.
If I see this correctly in the code, the CassandraSink is using the value of
its input stream automatically, so in your case
Tuple2<String, Tuple6<String, String, Date, String, String, Double>>
What you want is it to use only
Tuple6<String, String, Date, String, String, Double>
without the first first part of Tuple2 (probably your key?).
A simple map function before adding the sink should to the trick:
DataStream<Tuple2<String, Tuple6<String, String, Date, String, String,
Double>>> dataStream = //...
dataStream.map(new MapFunction<Tuple2<String, Tuple6<String, String, Date,
String, String, Double>>, Tuple6<String, String, Date, String, String,
Double>>() {
@Override
public Tuple6<String, String, Date, String, String, Double>
map(Tuple2<String, Tuple6<String, String, Date, String, String, Double>>
value) throws Exception {
return value.f1;
}
});
Nico
On Sunday, 13 August 2017 19:23:54 CEST AndreaKinn wrote:
> Ok, this is my situation:
>
> I have a stream of Tuple2<String, Tuple6<String, String, Date, String,
> String, Double>>
>
> the cassandra code:
>
> CassandraSink.addSink(stream)
> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
> + " (user, sensor, timestamp, json_ld, observed_value,
> value)"
> + " VALUES (?, ?, ?, ?, ?, ?);")
> .setClusterBuilder(new ClusterBuilder() {
> @Override
> public Cluster buildCluster(Cluster.Builder builder) {
> return builder.addContactPoint("127.0.0.1").build();
> }
> })
> .build();
>
> the values to insert in VALUES clause are exactly the values of Tuple6. How
> can I indicate that to my statement ?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing
> -on-Cassandra-tp14744p14867.html Sent from the Apache Flink User Mailing
> List archive. mailing list archive at Nabble.com.
Re: Writing on Cassandra
Posted by AndreaKinn <ki...@hotmail.it>.
Ok, this is my situation:
I have a stream of Tuple2<String, Tuple6<String, String, Date, String,
String, Double>>
the cassandra code:
CassandraSink.addSink(stream)
.setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
+ " (user, sensor, timestamp, json_ld, observed_value,
value)"
+ " VALUES (?, ?, ?, ?, ?, ?);")
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
the values to insert in VALUES clause are exactly the values of Tuple6. How
can I indicate that to my statement ?
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14867.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Writing on Cassandra
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
In the doc section about Cassandra there is actually an example: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html>.
In a Flink Job you would therefore roughly do this:
StreamExecutionEnvironment env = ...:
DataStream<> input = env.addSource(...);
CassandraSink.addSink(input)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
}).build();
env.execute();
Best,
Aljoscha
> On 8. Aug 2017, at 21:08, AndreaKinn <ki...@hotmail.it> wrote:
>
> I probably solved import issue, but still need help to find some examples of
> use.
> Please let me know if someone has experience with Flink and Cassandra
> together
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14745.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Writing on Cassandra
Posted by AndreaKinn <ki...@hotmail.it>.
I probably solved import issue, but still need help to find some examples of
use.
Please let me know if someone has experience with Flink and Cassandra
together
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p14745.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.