You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ramya Ramamurthy <ha...@gmail.com> on 2020/07/21 10:42:20 UTC
Flink Redis connectivity
Hi,
As per the understanding we have from the documentation, I guess its not
possible to take the redis connection within the Data Stream. In that case,
how should i proceed ? How can i access a DB client object within the
stream ??
I am using Flink 1.7. any help here would be appreciated. Thanks.
RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
RedisConnection<String, String> client =
redisClient.connect();
DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
String>) value -> {
String ct = value.getField(5).toString();
String res = "";
if (ct.equals("14") || ct.equals("4")) {
res = client.set("key", "val");
}
return res;
});
Thanks,
Re: Flink Redis connectivity
Posted by Yangze Guo <ka...@gmail.com>.
Do you need to read the state you maintained from Redis? The
flink-connector-redis only contains sink operator.
Best,
Yangze Guo
On Thu, Jul 23, 2020 at 3:28 PM Ramya Ramamurthy <ha...@gmail.com> wrote:
>
> Hi,
>
> Thanks for your response.
>
> I am trying to maintain some state in redis, and for each incoming packet,
> I try to map the information on redis, and then finally use ES as a sink to
> push the data.
> But with this flink-connector-redis, I am not sure if the same can be
> achieved. Can you please elaborate on this , so it would be very helpful.
>
> Thank you.
>
>
> On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <my...@live.com> wrote:
>
> > Hi Ramya
> >
> > Have you ever tried flink-connector-redis<
> > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> > in bahir [1][2]? I think you could use it or obtain some insights.
> >
> > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> > [2] https://github.com/apache/bahir-flink
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Yangze Guo <ka...@gmail.com>
> > Sent: Tuesday, July 21, 2020 18:50
> > To: dev <de...@flink.apache.org>
> > Subject: Re: Flink Redis connectivity
> >
> > Hi,
> >
> > I think you could implement `RichMapFunction` and create `redisClient`
> > in the `open` method.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <ha...@gmail.com>
> > wrote:
> > >
> > > Hi,
> > >
> > > As per the understanding we have from the documentation, I guess its not
> > > possible to take the redis connection within the Data Stream. In that
> > case,
> > > how should i proceed ? How can i access a DB client object within the
> > > stream ??
> > >
> > > I am using Flink 1.7. any help here would be appreciated. Thanks.
> > >
> > > RedisClient redisClient = new
> > > RedisClient(RedisURI.create("redis://localhost:6379"));
> > > RedisConnection<String, String> client =
> > > redisClient.connect();
> > > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> > > String>) value -> {
> > >
> > > String ct = value.getField(5).toString();
> > >
> > > String res = "";
> > > if (ct.equals("14") || ct.equals("4")) {
> > >
> > > res = client.set("key", "val");
> > > }
> > > return res;
> > > });
> > >
> > > Thanks,
> >
Re: Flink Redis connectivity
Posted by Ramya Ramamurthy <ha...@gmail.com>.
Hi,
Thanks for your response.
I am trying to maintain some state in redis, and for each incoming packet,
I try to map the information on redis, and then finally use ES as a sink to
push the data.
But with this flink-connector-redis, I am not sure if the same can be
achieved. Can you please elaborate on this , so it would be very helpful.
Thank you.
On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <my...@live.com> wrote:
> Hi Ramya
>
> Have you ever tried flink-connector-redis<
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> in bahir [1][2]? I think you could use it or obtain some insights.
>
> [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> [2] https://github.com/apache/bahir-flink
>
> Best
> Yun Tang
>
> ________________________________
> From: Yangze Guo <ka...@gmail.com>
> Sent: Tuesday, July 21, 2020 18:50
> To: dev <de...@flink.apache.org>
> Subject: Re: Flink Redis connectivity
>
> Hi,
>
> I think you could implement `RichMapFunction` and create `redisClient`
> in the `open` method.
>
> Best,
> Yangze Guo
>
> On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <ha...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > As per the understanding we have from the documentation, I guess its not
> > possible to take the redis connection within the Data Stream. In that
> case,
> > how should i proceed ? How can i access a DB client object within the
> > stream ??
> >
> > I am using Flink 1.7. any help here would be appreciated. Thanks.
> >
> > RedisClient redisClient = new
> > RedisClient(RedisURI.create("redis://localhost:6379"));
> > RedisConnection<String, String> client =
> > redisClient.connect();
> > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> > String>) value -> {
> >
> > String ct = value.getField(5).toString();
> >
> > String res = "";
> > if (ct.equals("14") || ct.equals("4")) {
> >
> > res = client.set("key", "val");
> > }
> > return res;
> > });
> >
> > Thanks,
>
Re: Flink Redis connectivity
Posted by Yun Tang <my...@live.com>.
Hi Ramya
Have you ever tried flink-connector-redis<https://github.com/apache/bahir-flink/tree/master/flink-connector-redis> in bahir [1][2]? I think you could use it or obtain some insights.
[1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
[2] https://github.com/apache/bahir-flink
Best
Yun Tang
________________________________
From: Yangze Guo <ka...@gmail.com>
Sent: Tuesday, July 21, 2020 18:50
To: dev <de...@flink.apache.org>
Subject: Re: Flink Redis connectivity
Hi,
I think you could implement `RichMapFunction` and create `redisClient`
in the `open` method.
Best,
Yangze Guo
On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <ha...@gmail.com> wrote:
>
> Hi,
>
> As per the understanding we have from the documentation, I guess its not
> possible to take the redis connection within the Data Stream. In that case,
> how should i proceed ? How can i access a DB client object within the
> stream ??
>
> I am using Flink 1.7. any help here would be appreciated. Thanks.
>
> RedisClient redisClient = new
> RedisClient(RedisURI.create("redis://localhost:6379"));
> RedisConnection<String, String> client =
> redisClient.connect();
> DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> String>) value -> {
>
> String ct = value.getField(5).toString();
>
> String res = "";
> if (ct.equals("14") || ct.equals("4")) {
>
> res = client.set("key", "val");
> }
> return res;
> });
>
> Thanks,
Re: Flink Redis connectivity
Posted by Yangze Guo <ka...@gmail.com>.
Hi,
I think you could implement `RichMapFunction` and create `redisClient`
in the `open` method.
Best,
Yangze Guo
On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <ha...@gmail.com> wrote:
>
> Hi,
>
> As per the understanding we have from the documentation, I guess its not
> possible to take the redis connection within the Data Stream. In that case,
> how should i proceed ? How can i access a DB client object within the
> stream ??
>
> I am using Flink 1.7. any help here would be appreciated. Thanks.
>
> RedisClient redisClient = new
> RedisClient(RedisURI.create("redis://localhost:6379"));
> RedisConnection<String, String> client =
> redisClient.connect();
> DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> String>) value -> {
>
> String ct = value.getField(5).toString();
>
> String res = "";
> if (ct.equals("14") || ct.equals("4")) {
>
> res = client.set("key", "val");
> }
> return res;
> });
>
> Thanks,
答复: Flink Redis connectivity
Posted by 范超 <fa...@mgtv.com>.
Hi Ramya
I just tried to code the example which is worked in 1.10 which I using a custom RichFlatMapFunction to connect ,transform data and release the conn in its override method.
// app.java
public class RedisMapDemo {
public static void main(String[] args) throws Exception {
// 1. source
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStream<String> sourceStream = env.fromElements("test_value");
// 2. custom map function
DataStream<String> redisUpdatedStream = sourceStream.flatMap(new RedisFlatMap());
redisUpdatedStream.print();
env.execute("testing redis flatmap");
}
}
// this should be saved as another java file (RedisFlatMap.java)
public class RedisFlatMap extends RichFlatMapFunction<String, String> {
String TEST_REDIS_KEY = "my_first_lettuce_key";
RedisClient redisClient;
StatefulRedisConnection<String, String> connection;
RedisCommands<String, String> syncCommands;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
redisClient = RedisClient.create("redis://localhost:6379/0");
connection = redisClient.connect();
syncCommands = connection.sync();
}
@Override
public void close() throws Exception {
super.close();
// maybe release conn here ?
connection.close();
redisClient.shutdown();
}
@Override
public void flatMap(String inputString, Collector<String> out)
throws Exception {
// 1. write to redis
// syncCommands.set(TEST_REDIS_KEY, " Hello, Redis!");
// 2. read from redis
String tmpValue = syncCommands.get(TEST_REDIS_KEY);
// 3. transform
out.collect(inputString + " - " + tmpValue);
}
}
-----邮件原件-----
发件人: Ramya Ramamurthy [mailto:hairums@gmail.com]
发送时间: 2020年7月21日 星期二 18:42
收件人: dev@flink.apache.org
主题: Flink Redis connectivity
Hi,
As per the understanding we have from the documentation, I guess its not possible to take the redis connection within the Data Stream. In that case, how should i proceed ? How can i access a DB client object within the stream ??
I am using Flink 1.7. any help here would be appreciated. Thanks.
RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
RedisConnection<String, String> client = redisClient.connect(); DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
String>) value -> {
String ct = value.getField(5).toString();
String res = "";
if (ct.equals("14") || ct.equals("4")) {
res = client.set("key", "val");
}
return res;
});
Thanks,