You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Meghashyam Sandeep V <vr...@gmail.com> on 2016/12/09 18:26:31 UTC

Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to
Cassandra(I can't use standard C* sink that comes with flink as I have
customized auth to C*). I'm currently have the following:

messageStream
        .rebalance()

        .map( s-> {

    return mapper.readValue(s, JsonNode.class);)

        .filter(//filter some messages)

        .map(

         (MapFunction<JsonNode, String>) message -> {

         getDbSession.execute("QUERY_TO_EXEC")

         })

private static Session getDbSession() {
    if(dbSession == null && store!=null) {
        dbSession = getSession();
    }

    return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have
dbSession as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar).
When I run using flink with YARN on EMR I get a NPE at the session
which is kind of weird.


Thanks

Re: Reg. custom sinks in Flink

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

the query is generated automatically from the pojo by the datastax 
MappingManager in the CassandraPojoSink; Flink isn't generating anything 
itself.

On the MappingManager you can set the TTL for all queries (it also 
allows some other stuff). So, to allow the user to set the TTL we must 
add a hook
to configure the MappingManager; this can be done the same way the 
Cluster is configured using the ClusterBuilder.

Regards,
Chesnay

On 12.12.2016 19:12, Meghashyam Sandeep V wrote:
> Thank you Till. I wanted to contribute towards Flink. Looks like this 
> could be a good start. I couldn't find the place where the insert 
> query is built for Pojo sinks in CassandraSink.java, 
> CassandraPojoSink.java, or CassandraSinkBase.java. Could you throw 
> some light about how that insert query is built automatically by the 
> sink?
>
> Thanks,
>
> On Mon, Dec 12, 2016 at 7:56 AM, Till Rohrmann <trohrmann@apache.org 
> <ma...@apache.org>> wrote:
>
>     (1) A subtask is a parallel instance of an operator and thus
>     responsible for a partition (possibly infinite) of the whole
>     DataStream/DataSet.
>
>     (2) Maybe you can add this feature to Flink's Cassandra Sink.
>
>     Cheers,
>     Till
>
>     On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V
>     <vr1meghashyam@gmail.com <ma...@gmail.com>> wrote:
>
>         Data piles up in Cassandra without TTL. Is there a workaround
>         for this problem? Is there a way to specify my query and still
>         use Pojo?
>
>         Thanks,
>
>         On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler
>         <chesnay@apache.org <ma...@apache.org>> wrote:
>
>             Regarding 2) I don't think so. That would require access
>             to the datastax MappingManager.
>             We could add something similar as the ClusterBuilder for
>             that though.
>
>             Regards,
>             Chesnay
>
>
>             On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
>>             Hi Till,
>>
>>             Thanks for the information.
>>
>>             1. What do you mean by 'subtask', is it every partition
>>             or every message in the stream?
>>
>>             2. I tried using CassandraSink with a Pojo. Is there a
>>             way to specify TTL as I can't use a query when I have a
>>             datastream with Pojo?
>>
>>             CassandraSink.addSink(messageStream)
>>                       .setClusterBuilder(new ClusterBuilder() {
>>                           @Override protected Cluster buildCluster(Cluster.Builder builder) {
>>                               return buildCassandraCluster();
>>                           }
>>                       })
>>                       .build();
>>             Thanks,
>>             On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann
>>             <trohrmann@apache.org <ma...@apache.org>> wrote:
>>
>>                 Hi Meghashyam,
>>
>>                 1.
>>
>>                     You can perform initializations in the open
>>                     method of the |RichSinkFunction| interface. The
>>                     |open| method will be called once for every sub
>>                     task when initializing it. If you want to share
>>                     the resource across multiple sub tasks running in
>>                     the same JVM you can also store the |dbSession|
>>                     in a class variable.
>>
>>                 2.
>>
>>                     The Flink community is currently working on
>>                     adding security support including ssl encryption
>>                     to Flink. So maybe in the future you can use
>>                     Flink\u2019s Cassandra sink again.
>>
>>                 Cheers, Till
>>
>>                 \u200b
>>                 On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V
>>                 <vr1meghashyam@gmail.com
>>                 <ma...@gmail.com>> wrote:
>>
>>                     Thanks a lot for the quick reply Shannon.
>>                     1. I will create a class that extends
>>                     SinkFunction and write my connection logic there.
>>                     My only question here is- will a dbSession be
>>                     created for each message/partition which might
>>                     affect the performance? Thats the reason why I
>>                     added this line to create a connection once and
>>                     use it along the datastream. if(dbSession == null
>>                     && store!=null) { dbSession = getSession();}
>>                     2. I couldn't use flink-connector-cassandra as I
>>                     have SSL enabled for my C* cluster and I couldn't
>>                     get it work with all my SSL
>>                     config(truststore,keystore etc) added to cluster
>>                     building. I didn't find a proper example with SSL
>>                     enabled flink-connector-cassandra
>>                     Thanks
>>                     On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey
>>                     <scarey@expedia.com <ma...@expedia.com>>
>>                     wrote:
>>
>>                         You haven't really added a sink in Flink
>>                         terminology, you're just performing a side
>>                         effect within a map operator. So while it may
>>                         work, if you want to add a sink proper you
>>                         need have an object that extends SinkFunction
>>                         or RichSinkFunction. The method call on the
>>                         stream should be ".addSink(\u2026)".
>>                         Also, the dbSession isn't really Flink state
>>                         as it will not vary based on the position in
>>                         or content in the stream. It's a necessary
>>                         helper object, yes, but you don't need Flink
>>                         to checkpoint it.
>>                         You can still use the sinks provided with
>>                         flink-connector-cassandra and customize the
>>                         cluster building by passing your
>>                         own ClusterBuilder into the constructor.
>>                         -Shannon
>>                         From: Meghashyam Sandeep V
>>                         <vr1meghashyam@gmail.com
>>                         <ma...@gmail.com>> Date:
>>                         Friday, December 9, 2016 at 12:26 PM To:
>>                         <user@flink.apache.org
>>                         <ma...@flink.apache.org>>,
>>                         <dev@flink.apache.org
>>                         <ma...@flink.apache.org>> Subject: Reg.
>>                         custom sinks in Flink
>>                         Hi there,
>>                         I have a flink streaming app where my source
>>                         is Kafka and a custom sink to Cassandra(I
>>                         can't use standard C* sink that comes with
>>                         flink as I have customized auth to C*). I'm
>>                         currently have the following:
>>
>>                         messageStream
>>                                  .rebalance()
>>
>>                                  .map( s-> {
>>
>>                              returnmapper.readValue(s, JsonNode.class);)
>>
>>                                  .filter(//filter some messages)
>>
>>                                  .map(
>>
>>                                   (MapFunction<JsonNode, String>) message -> {
>>
>>                                   getDbSession.execute("QUERY_TO_EXEC")
>>
>>                                   })
>>
>>                         private static Session getDbSession() {
>>                              if(dbSession ==null &&store!=null) {
>>                                  dbSession = getSession();
>>                              }
>>
>>                              return dbSession;
>>                         }
>>
>>                         1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>
>>                         2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>
>>                         Thanks
>>

Re: Reg. custom sinks in Flink

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
Thank you Till. I wanted to contribute towards Flink. Looks like this could
be a good start. I couldn't find the place where the insert query is built
for Pojo sinks in CassandraSink.java, CassandraPojoSink.java, or
CassandraSinkBase.java. Could you throw some light about how that insert
query is built automatically by the sink?

Thanks,

On Mon, Dec 12, 2016 at 7:56 AM, Till Rohrmann <tr...@apache.org> wrote:

> (1) A subtask is a parallel instance of an operator and thus responsible
> for a partition (possibly infinite) of the whole DataStream/DataSet.
>
> (2) Maybe you can add this feature to Flink's Cassandra Sink.
>
> Cheers,
> Till
>
> On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V <
> vr1meghashyam@gmail.com> wrote:
>
>> Data piles up in Cassandra without TTL. Is there a workaround for this
>> problem? Is there a way to specify my query and still use Pojo?
>>
>> Thanks,
>>
>> On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> Regarding 2) I don't think so. That would require access to the datastax
>>> MappingManager.
>>> We could add something similar as the ClusterBuilder for that though.
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
>>>
>>> Hi Till,
>>>
>>> Thanks for the information.
>>>
>>> 1. What do you mean by 'subtask', is it every partition or every message
>>> in the stream?
>>>
>>> 2. I tried using CassandraSink with a Pojo. Is there a way to specify
>>> TTL as I can't use a query when I have a datastream with Pojo?
>>>
>>> CassandraSink.addSink(messageStream)
>>>          .setClusterBuilder(new ClusterBuilder() {
>>>              @Override             protected Cluster buildCluster(Cluster.Builder builder) {
>>>                  return buildCassandraCluster();
>>>              }
>>>          })
>>>          .build();
>>>
>>> Thanks,
>>>
>>>
>>> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Meghashyam,
>>>>
>>>>    1.
>>>>
>>>>    You can perform initializations in the open method of the
>>>>    RichSinkFunction interface. The open method will be called once for
>>>>    every sub task when initializing it. If you want to share the resource
>>>>    across multiple sub tasks running in the same JVM you can also store the
>>>>    dbSession in a class variable.
>>>>    2.
>>>>
>>>>    The Flink community is currently working on adding security support
>>>>    including ssl encryption to Flink. So maybe in the future you can use
>>>>    Flink’s Cassandra sink again.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
>>>> <vr...@gmail.com> wrote:
>>>>
>>>>> Thanks a lot for the quick reply Shannon.
>>>>>
>>>>> 1. I will create a class that extends SinkFunction and write my
>>>>> connection logic there. My only question here is- will a dbSession be
>>>>> created for each message/partition which might affect the performance?
>>>>> Thats the reason why I added this line to create a connection once and use
>>>>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>>>>> = getSession();}
>>>>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for
>>>>> my C* cluster and I couldn't get it work with all my SSL
>>>>> config(truststore,keystore etc) added to cluster building. I didn't find a
>>>>> proper example with SSL enabled flink-connector-cassandra
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com>
>>>>> wrote:
>>>>>
>>>>>> You haven't really added a sink in Flink terminology, you're just
>>>>>> performing a side effect within a map operator. So while it may work, if
>>>>>> you want to add a sink proper you need have an object that extends
>>>>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>>>>> ".addSink(…)".
>>>>>>
>>>>>> Also, the dbSession isn't really Flink state as it will not vary
>>>>>> based on the position in or content in the stream. It's a necessary helper
>>>>>> object, yes, but you don't need Flink to checkpoint it.
>>>>>>
>>>>>> You can still use the sinks provided with flink-connector-cassandra
>>>>>> and customize the cluster building by passing your own ClusterBuilder into
>>>>>> the constructor.
>>>>>>
>>>>>> -Shannon
>>>>>>
>>>>>> From: Meghashyam Sandeep V < <vr...@gmail.com>
>>>>>> vr1meghashyam@gmail.com>
>>>>>> Date: Friday, December 9, 2016 at 12:26 PM
>>>>>> To: < <us...@flink.apache.org>, <
>>>>>> dev@flink.apache.org>
>>>>>> Subject: Reg. custom sinks in Flink
>>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I have a flink streaming app where my source is Kafka and a custom
>>>>>> sink to Cassandra(I can't use standard C* sink that comes with flink as I
>>>>>> have customized auth to C*). I'm currently have the following:
>>>>>>
>>>>>> messageStream
>>>>>>         .rebalance()
>>>>>>
>>>>>>         .map( s-> {
>>>>>>
>>>>>>     return mapper.readValue(s, JsonNode.class);)
>>>>>>
>>>>>>         .filter(//filter some messages)
>>>>>>
>>>>>>         .map(
>>>>>>
>>>>>>          (MapFunction<JsonNode, String>) message -> {
>>>>>>
>>>>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>>>>
>>>>>>          })
>>>>>>
>>>>>> private static Session getDbSession() {
>>>>>>     if(dbSession == null && store!=null) {
>>>>>>         dbSession = getSession();
>>>>>>     }
>>>>>>
>>>>>>     return dbSession;
>>>>>> }
>>>>>>
>>>>>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>>>>>
>>>>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>
>

Re: Reg. custom sinks in Flink

Posted by Till Rohrmann <tr...@apache.org>.
(1) A subtask is a parallel instance of an operator and thus responsible
for a partition (possibly infinite) of the whole DataStream/DataSet.

(2) Maybe you can add this feature to Flink's Cassandra Sink.

Cheers,
Till

On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V <
vr1meghashyam@gmail.com> wrote:

> Data piles up in Cassandra without TTL. Is there a workaround for this
> problem? Is there a way to specify my query and still use Pojo?
>
> Thanks,
>
> On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Regarding 2) I don't think so. That would require access to the datastax
>> MappingManager.
>> We could add something similar as the ClusterBuilder for that though.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
>>
>> Hi Till,
>>
>> Thanks for the information.
>>
>> 1. What do you mean by 'subtask', is it every partition or every message
>> in the stream?
>>
>> 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
>> as I can't use a query when I have a datastream with Pojo?
>>
>> CassandraSink.addSink(messageStream)
>>          .setClusterBuilder(new ClusterBuilder() {
>>              @Override             protected Cluster buildCluster(Cluster.Builder builder) {
>>                  return buildCassandraCluster();
>>              }
>>          })
>>          .build();
>>
>> Thanks,
>>
>>
>> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Meghashyam,
>>>
>>>    1.
>>>
>>>    You can perform initializations in the open method of the
>>>    RichSinkFunction interface. The open method will be called once for
>>>    every sub task when initializing it. If you want to share the resource
>>>    across multiple sub tasks running in the same JVM you can also store the
>>>    dbSession in a class variable.
>>>    2.
>>>
>>>    The Flink community is currently working on adding security support
>>>    including ssl encryption to Flink. So maybe in the future you can use
>>>    Flink’s Cassandra sink again.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
>>> <vr...@gmail.com> wrote:
>>>
>>>> Thanks a lot for the quick reply Shannon.
>>>>
>>>> 1. I will create a class that extends SinkFunction and write my
>>>> connection logic there. My only question here is- will a dbSession be
>>>> created for each message/partition which might affect the performance?
>>>> Thats the reason why I added this line to create a connection once and use
>>>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>>>> = getSession();}
>>>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for
>>>> my C* cluster and I couldn't get it work with all my SSL
>>>> config(truststore,keystore etc) added to cluster building. I didn't find a
>>>> proper example with SSL enabled flink-connector-cassandra
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com>
>>>> wrote:
>>>>
>>>>> You haven't really added a sink in Flink terminology, you're just
>>>>> performing a side effect within a map operator. So while it may work, if
>>>>> you want to add a sink proper you need have an object that extends
>>>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>>>> ".addSink(…)".
>>>>>
>>>>> Also, the dbSession isn't really Flink state as it will not vary based
>>>>> on the position in or content in the stream. It's a necessary helper
>>>>> object, yes, but you don't need Flink to checkpoint it.
>>>>>
>>>>> You can still use the sinks provided with flink-connector-cassandra
>>>>> and customize the cluster building by passing your own ClusterBuilder into
>>>>> the constructor.
>>>>>
>>>>> -Shannon
>>>>>
>>>>> From: Meghashyam Sandeep V < <vr...@gmail.com>
>>>>> vr1meghashyam@gmail.com>
>>>>> Date: Friday, December 9, 2016 at 12:26 PM
>>>>> To: < <us...@flink.apache.org>, <
>>>>> dev@flink.apache.org>
>>>>> Subject: Reg. custom sinks in Flink
>>>>>
>>>>> Hi there,
>>>>>
>>>>> I have a flink streaming app where my source is Kafka and a custom
>>>>> sink to Cassandra(I can't use standard C* sink that comes with flink as I
>>>>> have customized auth to C*). I'm currently have the following:
>>>>>
>>>>> messageStream
>>>>>         .rebalance()
>>>>>
>>>>>         .map( s-> {
>>>>>
>>>>>     return mapper.readValue(s, JsonNode.class);)
>>>>>
>>>>>         .filter(//filter some messages)
>>>>>
>>>>>         .map(
>>>>>
>>>>>          (MapFunction<JsonNode, String>) message -> {
>>>>>
>>>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>>>
>>>>>          })
>>>>>
>>>>> private static Session getDbSession() {
>>>>>     if(dbSession == null && store!=null) {
>>>>>         dbSession = getSession();
>>>>>     }
>>>>>
>>>>>     return dbSession;
>>>>> }
>>>>>
>>>>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>>>>
>>>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>

Re: Reg. custom sinks in Flink

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
Data piles up in Cassandra without TTL. Is there a workaround for this
problem? Is there a way to specify my query and still use Pojo?

Thanks,

On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <ch...@apache.org>
wrote:

> Regarding 2) I don't think so. That would require access to the datastax
> MappingManager.
> We could add something similar as the ClusterBuilder for that though.
>
> Regards,
> Chesnay
>
>
> On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
>
> Hi Till,
>
> Thanks for the information.
>
> 1. What do you mean by 'subtask', is it every partition or every message
> in the stream?
>
> 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
> as I can't use a query when I have a datastream with Pojo?
>
> CassandraSink.addSink(messageStream)
>          .setClusterBuilder(new ClusterBuilder() {
>              @Override             protected Cluster buildCluster(Cluster.Builder builder) {
>                  return buildCassandraCluster();
>              }
>          })
>          .build();
>
> Thanks,
>
>
> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Meghashyam,
>>
>>    1.
>>
>>    You can perform initializations in the open method of the
>>    RichSinkFunction interface. The open method will be called once for
>>    every sub task when initializing it. If you want to share the resource
>>    across multiple sub tasks running in the same JVM you can also store the
>>    dbSession in a class variable.
>>    2.
>>
>>    The Flink community is currently working on adding security support
>>    including ssl encryption to Flink. So maybe in the future you can use
>>    Flink’s Cassandra sink again.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
>> <vr...@gmail.com> wrote:
>>
>>> Thanks a lot for the quick reply Shannon.
>>>
>>> 1. I will create a class that extends SinkFunction and write my
>>> connection logic there. My only question here is- will a dbSession be
>>> created for each message/partition which might affect the performance?
>>> Thats the reason why I added this line to create a connection once and use
>>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>>> = getSession();}
>>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for
>>> my C* cluster and I couldn't get it work with all my SSL
>>> config(truststore,keystore etc) added to cluster building. I didn't find a
>>> proper example with SSL enabled flink-connector-cassandra
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com>
>>> wrote:
>>>
>>>> You haven't really added a sink in Flink terminology, you're just
>>>> performing a side effect within a map operator. So while it may work, if
>>>> you want to add a sink proper you need have an object that extends
>>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>>> ".addSink(…)".
>>>>
>>>> Also, the dbSession isn't really Flink state as it will not vary based
>>>> on the position in or content in the stream. It's a necessary helper
>>>> object, yes, but you don't need Flink to checkpoint it.
>>>>
>>>> You can still use the sinks provided with flink-connector-cassandra and
>>>> customize the cluster building by passing your own ClusterBuilder into the
>>>> constructor.
>>>>
>>>> -Shannon
>>>>
>>>> From: Meghashyam Sandeep V < <vr...@gmail.com>
>>>> vr1meghashyam@gmail.com>
>>>> Date: Friday, December 9, 2016 at 12:26 PM
>>>> To: < <us...@flink.apache.org>, <
>>>> dev@flink.apache.org>
>>>> Subject: Reg. custom sinks in Flink
>>>>
>>>> Hi there,
>>>>
>>>> I have a flink streaming app where my source is Kafka and a custom sink
>>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>>> customized auth to C*). I'm currently have the following:
>>>>
>>>> messageStream
>>>>         .rebalance()
>>>>
>>>>         .map( s-> {
>>>>
>>>>     return mapper.readValue(s, JsonNode.class);)
>>>>
>>>>         .filter(//filter some messages)
>>>>
>>>>         .map(
>>>>
>>>>          (MapFunction<JsonNode, String>) message -> {
>>>>
>>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>>
>>>>          })
>>>>
>>>> private static Session getDbSession() {
>>>>     if(dbSession == null && store!=null) {
>>>>         dbSession = getSession();
>>>>     }
>>>>
>>>>     return dbSession;
>>>> }
>>>>
>>>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>>>
>>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>>
>>>> Thanks
>>>>
>>>>

Re: Reg. custom sinks in Flink

Posted by Chesnay Schepler <ch...@apache.org>.
Regarding 2) I don't think so. That would require access to the datastax 
MappingManager.
We could add something similar as the ClusterBuilder for that though.

Regards,
Chesnay

On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
> Hi Till,
>
> Thanks for the information.
>
> 1. What do you mean by 'subtask', is it every partition or every 
> message in the stream?
>
> 2. I tried using CassandraSink with a Pojo. Is there a way to specify 
> TTL as I can't use a query when I have a datastream with Pojo?
>
> CassandraSink.addSink(messageStream)
>           .setClusterBuilder(new ClusterBuilder() {
>               @Override protected Cluster buildCluster(Cluster.Builder builder) {
>                   return buildCassandraCluster();
>               }
>           })
>           .build();
> Thanks,
>
> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrmann@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hi Meghashyam,
>
>     1.
>
>         You can perform initializations in the open method of the
>         |RichSinkFunction| interface. The |open| method will be called
>         once for every sub task when initializing it. If you want to
>         share the resource across multiple sub tasks running in the
>         same JVM you can also store the |dbSession| in a class variable.
>
>     2.
>
>         The Flink community is currently working on adding security
>         support including ssl encryption to Flink. So maybe in the
>         future you can use Flink\u2019s Cassandra sink again.
>
>     Cheers,
>     Till
>
>     \u200b
>
>     On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V
>     <vr1meghashyam@gmail.com <ma...@gmail.com>> wrote:
>
>         Thanks a lot for the quick reply Shannon.
>
>         1. I will create a class that extends SinkFunction and write
>         my connection logic there. My only question here is- will a
>         dbSession be created for each message/partition which might
>         affect the performance? Thats the reason why I added this line
>         to create a connection once and use it along the datastream.
>         if(dbSession == null && store!=null) { dbSession = getSession();}
>         2. I couldn't use flink-connector-cassandra as I have SSL
>         enabled for my C* cluster and I couldn't get it work with all
>         my SSL config(truststore,keystore etc) added to cluster
>         building. I didn't find a proper example with SSL enabled
>         flink-connector-cassandra
>
>
>         Thanks
>
>
>
>
>         On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey
>         <scarey@expedia.com <ma...@expedia.com>> wrote:
>
>             You haven't really added a sink in Flink terminology,
>             you're just performing a side effect within a map
>             operator. So while it may work, if you want to add a sink
>             proper you need have an object that extends SinkFunction
>             or RichSinkFunction. The method call on the stream should
>             be ".addSink(\u2026)".
>
>             Also, the dbSession isn't really Flink state as it will
>             not vary based on the position in or content in the
>             stream. It's a necessary helper object, yes, but you don't
>             need Flink to checkpoint it.
>
>             You can still use the sinks provided with
>             flink-connector-cassandra and customize the cluster
>             building by passing your own ClusterBuilder into the
>             constructor.
>
>             -Shannon
>
>             From: Meghashyam Sandeep V <vr1meghashyam@gmail.com
>             <ma...@gmail.com>>
>             Date: Friday, December 9, 2016 at 12:26 PM
>             To: <user@flink.apache.org
>             <ma...@flink.apache.org>>, <dev@flink.apache.org
>             <ma...@flink.apache.org>>
>             Subject: Reg. custom sinks in Flink
>
>             Hi there,
>
>             I have a flink streaming app where my source is Kafka and
>             a custom sink to Cassandra(I can't use standard C* sink
>             that comes with flink as I have customized auth to C*).
>             I'm currently have the following:
>
>             messageStream
>                      .rebalance()
>
>                      .map( s-> {
>
>                  returnmapper.readValue(s, JsonNode.class);)
>
>                      .filter(//filter some messages)
>
>                      .map(
>
>                       (MapFunction<JsonNode, String>) message -> {
>
>                       getDbSession.execute("QUERY_TO_EXEC")
>
>                       })
>
>             private static Session getDbSession() {
>                  if(dbSession ==null &&store!=null) {
>                      dbSession = getSession();
>                  }
>
>                  return dbSession;
>             }
>
>             1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>
>             2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>
>             Thanks
>

Re: Reg. custom sinks in Flink

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
Hi Till,

Thanks for the information.

1. What do you mean by 'subtask', is it every partition or every message in
the stream?

2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
as I can't use a query when I have a datastream with Pojo?

CassandraSink.addSink(messageStream)
         .setClusterBuilder(new ClusterBuilder() {
             @Override
             protected Cluster buildCluster(Cluster.Builder builder) {
                 return buildCassandraCluster();
             }
         })
         .build();


Thanks,


On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Meghashyam,
>
>    1.
>
>    You can perform initializations in the open method of the
>    RichSinkFunction interface. The open method will be called once for
>    every sub task when initializing it. If you want to share the resource
>    across multiple sub tasks running in the same JVM you can also store the
>    dbSession in a class variable.
>    2.
>
>    The Flink community is currently working on adding security support
>    including ssl encryption to Flink. So maybe in the future you can use
>    Flink’s Cassandra sink again.
>
> Cheers,
> Till
> ​
>
> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
> vr1meghashyam@gmail.com> wrote:
>
>> Thanks a lot for the quick reply Shannon.
>>
>> 1. I will create a class that extends SinkFunction and write my
>> connection logic there. My only question here is- will a dbSession be
>> created for each message/partition which might affect the performance?
>> Thats the reason why I added this line to create a connection once and use
>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>> = getSession();}
>>
>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
>> C* cluster and I couldn't get it work with all my SSL
>> config(truststore,keystore etc) added to cluster building. I didn't find a
>> proper example with SSL enabled flink-connector-cassandra
>>
>>
>> Thanks
>>
>>
>>
>>
>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com>
>> wrote:
>>
>>> You haven't really added a sink in Flink terminology, you're just
>>> performing a side effect within a map operator. So while it may work, if
>>> you want to add a sink proper you need have an object that extends
>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>> ".addSink(…)".
>>>
>>> Also, the dbSession isn't really Flink state as it will not vary based
>>> on the position in or content in the stream. It's a necessary helper
>>> object, yes, but you don't need Flink to checkpoint it.
>>>
>>> You can still use the sinks provided with flink-connector-cassandra and
>>> customize the cluster building by passing your own ClusterBuilder into the
>>> constructor.
>>>
>>> -Shannon
>>>
>>> From: Meghashyam Sandeep V <vr...@gmail.com>
>>> Date: Friday, December 9, 2016 at 12:26 PM
>>> To: <us...@flink.apache.org>, <de...@flink.apache.org>
>>> Subject: Reg. custom sinks in Flink
>>>
>>> Hi there,
>>>
>>> I have a flink streaming app where my source is Kafka and a custom sink
>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>> customized auth to C*). I'm currently have the following:
>>>
>>> messageStream
>>>         .rebalance()
>>>
>>>         .map( s-> {
>>>
>>>     return mapper.readValue(s, JsonNode.class);)
>>>
>>>         .filter(//filter some messages)
>>>
>>>         .map(
>>>
>>>          (MapFunction<JsonNode, String>) message -> {
>>>
>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>
>>>          })
>>>
>>> private static Session getDbSession() {
>>>     if(dbSession == null && store!=null) {
>>>         dbSession = getSession();
>>>     }
>>>
>>>     return dbSession;
>>> }
>>>
>>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>>
>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>

Re: Reg. custom sinks in Flink

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
Hi Till,

Thanks for the information.

1. What do you mean by 'subtask', is it every partition or every message in
the stream?

2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
as I can't use a query when I have a datastream with Pojo?

CassandraSink.addSink(messageStream)
         .setClusterBuilder(new ClusterBuilder() {
             @Override
             protected Cluster buildCluster(Cluster.Builder builder) {
                 return buildCassandraCluster();
             }
         })
         .build();


Thanks,


On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Meghashyam,
>
>    1.
>
>    You can perform initializations in the open method of the
>    RichSinkFunction interface. The open method will be called once for
>    every sub task when initializing it. If you want to share the resource
>    across multiple sub tasks running in the same JVM you can also store the
>    dbSession in a class variable.
>    2.
>
>    The Flink community is currently working on adding security support
>    including ssl encryption to Flink. So maybe in the future you can use
>    Flink’s Cassandra sink again.
>
> Cheers,
> Till
> ​
>
> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
> vr1meghashyam@gmail.com> wrote:
>
>> Thanks a lot for the quick reply Shannon.
>>
>> 1. I will create a class that extends SinkFunction and write my
>> connection logic there. My only question here is- will a dbSession be
>> created for each message/partition which might affect the performance?
>> Thats the reason why I added this line to create a connection once and use
>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>> = getSession();}
>>
>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
>> C* cluster and I couldn't get it work with all my SSL
>> config(truststore,keystore etc) added to cluster building. I didn't find a
>> proper example with SSL enabled flink-connector-cassandra
>>
>>
>> Thanks
>>
>>
>>
>>
>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com>
>> wrote:
>>
>>> You haven't really added a sink in Flink terminology, you're just
>>> performing a side effect within a map operator. So while it may work, if
>>> you want to add a sink proper you need have an object that extends
>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>> ".addSink(…)".
>>>
>>> Also, the dbSession isn't really Flink state as it will not vary based
>>> on the position in or content in the stream. It's a necessary helper
>>> object, yes, but you don't need Flink to checkpoint it.
>>>
>>> You can still use the sinks provided with flink-connector-cassandra and
>>> customize the cluster building by passing your own ClusterBuilder into the
>>> constructor.
>>>
>>> -Shannon
>>>
>>> From: Meghashyam Sandeep V <vr...@gmail.com>
>>> Date: Friday, December 9, 2016 at 12:26 PM
>>> To: <us...@flink.apache.org>, <de...@flink.apache.org>
>>> Subject: Reg. custom sinks in Flink
>>>
>>> Hi there,
>>>
>>> I have a flink streaming app where my source is Kafka and a custom sink
>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>> customized auth to C*). I'm currently have the following:
>>>
>>> messageStream
>>>         .rebalance()
>>>
>>>         .map( s-> {
>>>
>>>     return mapper.readValue(s, JsonNode.class);)
>>>
>>>         .filter(//filter some messages)
>>>
>>>         .map(
>>>
>>>          (MapFunction<JsonNode, String>) message -> {
>>>
>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>
>>>          })
>>>
>>> private static Session getDbSession() {
>>>     if(dbSession == null && store!=null) {
>>>         dbSession = getSession();
>>>     }
>>>
>>>     return dbSession;
>>> }
>>>
>>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>>
>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>

Re: Reg. custom sinks in Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Meghashyam,

   1.

   You can perform initializations in the open method of the
   RichSinkFunction interface. The open method will be called once for
   every sub task when initializing it. If you want to share the resource
   across multiple sub tasks running in the same JVM you can also store the
   dbSession in a class variable.
   2.

   The Flink community is currently working on adding security support
   including ssl encryption to Flink. So maybe in the future you can use
   Flink’s Cassandra sink again.

Cheers,
Till
​

On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
vr1meghashyam@gmail.com> wrote:

> Thanks a lot for the quick reply Shannon.
>
> 1. I will create a class that extends SinkFunction and write my connection
> logic there. My only question here is- will a dbSession be created for each
> message/partition which might affect the performance? Thats the reason why
> I added this line to create a connection once and use it along the
> datastream. if(dbSession == null && store!=null) { dbSession =
> getSession();}
>
> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
> C* cluster and I couldn't get it work with all my SSL
> config(truststore,keystore etc) added to cluster building. I didn't find a
> proper example with SSL enabled flink-connector-cassandra
>
>
> Thanks
>
>
>
>
> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com> wrote:
>
>> You haven't really added a sink in Flink terminology, you're just
>> performing a side effect within a map operator. So while it may work, if
>> you want to add a sink proper you need have an object that extends
>> SinkFunction or RichSinkFunction. The method call on the stream should be
>> ".addSink(…)".
>>
>> Also, the dbSession isn't really Flink state as it will not vary based on
>> the position in or content in the stream. It's a necessary helper object,
>> yes, but you don't need Flink to checkpoint it.
>>
>> You can still use the sinks provided with flink-connector-cassandra and
>> customize the cluster building by passing your own ClusterBuilder into the
>> constructor.
>>
>> -Shannon
>>
>> From: Meghashyam Sandeep V <vr...@gmail.com>
>> Date: Friday, December 9, 2016 at 12:26 PM
>> To: <us...@flink.apache.org>, <de...@flink.apache.org>
>> Subject: Reg. custom sinks in Flink
>>
>> Hi there,
>>
>> I have a flink streaming app where my source is Kafka and a custom sink
>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>> customized auth to C*). I'm currently have the following:
>>
>> messageStream
>>         .rebalance()
>>
>>         .map( s-> {
>>
>>     return mapper.readValue(s, JsonNode.class);)
>>
>>         .filter(//filter some messages)
>>
>>         .map(
>>
>>          (MapFunction<JsonNode, String>) message -> {
>>
>>          getDbSession.execute("QUERY_TO_EXEC")
>>
>>          })
>>
>> private static Session getDbSession() {
>>     if(dbSession == null && store!=null) {
>>         dbSession = getSession();
>>     }
>>
>>     return dbSession;
>> }
>>
>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>
>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>
>>
>> Thanks
>>
>>
>

Re: Reg. custom sinks in Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Meghashyam,

   1.

   You can perform initializations in the open method of the
   RichSinkFunction interface. The open method will be called once for
   every sub task when initializing it. If you want to share the resource
   across multiple sub tasks running in the same JVM you can also store the
   dbSession in a class variable.
   2.

   The Flink community is currently working on adding security support
   including ssl encryption to Flink. So maybe in the future you can use
   Flink’s Cassandra sink again.

Cheers,
Till
​

On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
vr1meghashyam@gmail.com> wrote:

> Thanks a lot for the quick reply Shannon.
>
> 1. I will create a class that extends SinkFunction and write my connection
> logic there. My only question here is- will a dbSession be created for each
> message/partition which might affect the performance? Thats the reason why
> I added this line to create a connection once and use it along the
> datastream. if(dbSession == null && store!=null) { dbSession =
> getSession();}
>
> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
> C* cluster and I couldn't get it work with all my SSL
> config(truststore,keystore etc) added to cluster building. I didn't find a
> proper example with SSL enabled flink-connector-cassandra
>
>
> Thanks
>
>
>
>
> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com> wrote:
>
>> You haven't really added a sink in Flink terminology, you're just
>> performing a side effect within a map operator. So while it may work, if
>> you want to add a sink proper you need have an object that extends
>> SinkFunction or RichSinkFunction. The method call on the stream should be
>> ".addSink(…)".
>>
>> Also, the dbSession isn't really Flink state as it will not vary based on
>> the position in or content in the stream. It's a necessary helper object,
>> yes, but you don't need Flink to checkpoint it.
>>
>> You can still use the sinks provided with flink-connector-cassandra and
>> customize the cluster building by passing your own ClusterBuilder into the
>> constructor.
>>
>> -Shannon
>>
>> From: Meghashyam Sandeep V <vr...@gmail.com>
>> Date: Friday, December 9, 2016 at 12:26 PM
>> To: <us...@flink.apache.org>, <de...@flink.apache.org>
>> Subject: Reg. custom sinks in Flink
>>
>> Hi there,
>>
>> I have a flink streaming app where my source is Kafka and a custom sink
>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>> customized auth to C*). I'm currently have the following:
>>
>> messageStream
>>         .rebalance()
>>
>>         .map( s-> {
>>
>>     return mapper.readValue(s, JsonNode.class);)
>>
>>         .filter(//filter some messages)
>>
>>         .map(
>>
>>          (MapFunction<JsonNode, String>) message -> {
>>
>>          getDbSession.execute("QUERY_TO_EXEC")
>>
>>          })
>>
>> private static Session getDbSession() {
>>     if(dbSession == null && store!=null) {
>>         dbSession = getSession();
>>     }
>>
>>     return dbSession;
>> }
>>
>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>>
>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>
>>
>> Thanks
>>
>>
>

Re: Reg. custom sinks in Flink

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
Thanks a lot for the quick reply Shannon.

1. I will create a class that extends SinkFunction and write my connection
logic there. My only question here is- will a dbSession be created for each
message/partition which might affect the performance? Thats the reason why
I added this line to create a connection once and use it along the
datastream. if(dbSession == null && store!=null) { dbSession =
getSession();}

2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C*
cluster and I couldn't get it work with all my SSL
config(truststore,keystore etc) added to cluster building. I didn't find a
proper example with SSL enabled flink-connector-cassandra


Thanks




On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com> wrote:

> You haven't really added a sink in Flink terminology, you're just
> performing a side effect within a map operator. So while it may work, if
> you want to add a sink proper you need have an object that extends
> SinkFunction or RichSinkFunction. The method call on the stream should be
> ".addSink(…)".
>
> Also, the dbSession isn't really Flink state as it will not vary based on
> the position in or content in the stream. It's a necessary helper object,
> yes, but you don't need Flink to checkpoint it.
>
> You can still use the sinks provided with flink-connector-cassandra and
> customize the cluster building by passing your own ClusterBuilder into the
> constructor.
>
> -Shannon
>
> From: Meghashyam Sandeep V <vr...@gmail.com>
> Date: Friday, December 9, 2016 at 12:26 PM
> To: <us...@flink.apache.org>, <de...@flink.apache.org>
> Subject: Reg. custom sinks in Flink
>
> Hi there,
>
> I have a flink streaming app where my source is Kafka and a custom sink to
> Cassandra(I can't use standard C* sink that comes with flink as I have
> customized auth to C*). I'm currently have the following:
>
> messageStream
>         .rebalance()
>
>         .map( s-> {
>
>     return mapper.readValue(s, JsonNode.class);)
>
>         .filter(//filter some messages)
>
>         .map(
>
>          (MapFunction<JsonNode, String>) message -> {
>
>          getDbSession.execute("QUERY_TO_EXEC")
>
>          })
>
> private static Session getDbSession() {
>     if(dbSession == null && store!=null) {
>         dbSession = getSession();
>     }
>
>     return dbSession;
> }
>
> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>
> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>
>
> Thanks
>
>

Re: Reg. custom sinks in Flink

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
Thanks a lot for the quick reply Shannon.

1. I will create a class that extends SinkFunction and write my connection
logic there. My only question here is- will a dbSession be created for each
message/partition which might affect the performance? Thats the reason why
I added this line to create a connection once and use it along the
datastream. if(dbSession == null && store!=null) { dbSession =
getSession();}

2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C*
cluster and I couldn't get it work with all my SSL
config(truststore,keystore etc) added to cluster building. I didn't find a
proper example with SSL enabled flink-connector-cassandra


Thanks




On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sc...@expedia.com> wrote:

> You haven't really added a sink in Flink terminology, you're just
> performing a side effect within a map operator. So while it may work, if
> you want to add a sink proper you need have an object that extends
> SinkFunction or RichSinkFunction. The method call on the stream should be
> ".addSink(…)".
>
> Also, the dbSession isn't really Flink state as it will not vary based on
> the position in or content in the stream. It's a necessary helper object,
> yes, but you don't need Flink to checkpoint it.
>
> You can still use the sinks provided with flink-connector-cassandra and
> customize the cluster building by passing your own ClusterBuilder into the
> constructor.
>
> -Shannon
>
> From: Meghashyam Sandeep V <vr...@gmail.com>
> Date: Friday, December 9, 2016 at 12:26 PM
> To: <us...@flink.apache.org>, <de...@flink.apache.org>
> Subject: Reg. custom sinks in Flink
>
> Hi there,
>
> I have a flink streaming app where my source is Kafka and a custom sink to
> Cassandra(I can't use standard C* sink that comes with flink as I have
> customized auth to C*). I'm currently have the following:
>
> messageStream
>         .rebalance()
>
>         .map( s-> {
>
>     return mapper.readValue(s, JsonNode.class);)
>
>         .filter(//filter some messages)
>
>         .map(
>
>          (MapFunction<JsonNode, String>) message -> {
>
>          getDbSession.execute("QUERY_TO_EXEC")
>
>          })
>
> private static Session getDbSession() {
>     if(dbSession == null && store!=null) {
>         dbSession = getSession();
>     }
>
>     return dbSession;
> }
>
> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.
>
> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>
>
> Thanks
>
>

Re: Reg. custom sinks in Flink

Posted by Shannon Carey <sc...@expedia.com>.
You haven't really added a sink in Flink terminology, you're just performing a side effect within a map operator. So while it may work, if you want to add a sink proper you need have an object that extends SinkFunction or RichSinkFunction. The method call on the stream should be ".addSink(…)".

Also, the dbSession isn't really Flink state as it will not vary based on the position in or content in the stream. It's a necessary helper object, yes, but you don't need Flink to checkpoint it.

You can still use the sinks provided with flink-connector-cassandra and customize the cluster building by passing your own ClusterBuilder into the constructor.

-Shannon

From: Meghashyam Sandeep V <vr...@gmail.com>>
Date: Friday, December 9, 2016 at 12:26 PM
To: <us...@flink.apache.org>>, <de...@flink.apache.org>>
Subject: Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't use standard C* sink that comes with flink as I have customized auth to C*). I'm currently have the following:


messageStream
        .rebalance()

        .map( s-> {

    return mapper.readValue(s, JsonNode.class);)

        .filter(//filter some messages)

        .map(

         (MapFunction<JsonNode, String>) message -> {

         getDbSession.execute("QUERY_TO_EXEC")

         })

private static Session getDbSession() {
    if(dbSession == null && store!=null) {
        dbSession = getSession();
    }

    return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.


Thanks

Re: Reg. custom sinks in Flink

Posted by Shannon Carey <sc...@expedia.com>.
You haven't really added a sink in Flink terminology, you're just performing a side effect within a map operator. So while it may work, if you want to add a sink proper you need have an object that extends SinkFunction or RichSinkFunction. The method call on the stream should be ".addSink(…)".

Also, the dbSession isn't really Flink state as it will not vary based on the position in or content in the stream. It's a necessary helper object, yes, but you don't need Flink to checkpoint it.

You can still use the sinks provided with flink-connector-cassandra and customize the cluster building by passing your own ClusterBuilder into the constructor.

-Shannon

From: Meghashyam Sandeep V <vr...@gmail.com>>
Date: Friday, December 9, 2016 at 12:26 PM
To: <us...@flink.apache.org>>, <de...@flink.apache.org>>
Subject: Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't use standard C* sink that comes with flink as I have customized auth to C*). I'm currently have the following:


messageStream
        .rebalance()

        .map( s-> {

    return mapper.readValue(s, JsonNode.class);)

        .filter(//filter some messages)

        .map(

         (MapFunction<JsonNode, String>) message -> {

         getDbSession.execute("QUERY_TO_EXEC")

         })

private static Session getDbSession() {
    if(dbSession == null && store!=null) {
        dbSession = getSession();
    }

    return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.


Thanks