You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Soheil Pourbafrani <so...@gmail.com> on 2018/04/26 12:52:53 UTC

Insert data into Cassandra without Flink Cassandra connection

I want to use Cassandra native connection (Not Flink Cassandra connection)
to insert some data into Cassandra. According to the design of the code,
the connection to Cassandra will open once at the start and all taskmanager
use it to write data.  It's ok running in local mode.

The problem is when I submit the code on YARN cluster, as each
taskmanager has it's own JVM, the connection to the Cassandra will not
share and I should open and close it for each taskmanager. Is there any way
to have a connection for all taskmanagers?

Re: Insert data into Cassandra without Flink Cassandra connection

Posted by Shuyi Chen <su...@gmail.com>.
Maybe you can share a bit more about why you need only one connection to
Cassandra across all TaskManagers, so we can better help?

On Wed, May 2, 2018 at 4:08 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> The only way that I can think of is if you keep your flatMap operator with
> parallelism 1, but that might defeat the purpose. Otherwise there is no way
> to open one single connection and share it across multiple TaskManagers
> (which can be running on different physical machines). Please rethink your
> solution/approach with respect to distributed nature of Flink.
>
> However there are some valid use cases where one would like to have some
> part of his job graph distributed and some part(s) non distributed - like
> issuing one single commit after a distributed write, or processing a data
> in parallel but writing them to a relational database like MySQL via one
> single Sink operator..
>
> Piotrek
>
>
> On 26 Apr 2018, at 15:23, Soheil Pourbafrani <so...@gmail.com>
> wrote:
>
> Here is my code
>
> stream.flatMap(new FlatMapFunction<byte[], Void>() {
>
>     @Override
>     public void flatMap(byte[] value, Collector<Void> out) throws Exception {
>         Parser.setInsert(true);
>         CassandraConnection.connect();
>         Parser.setInsert(true);
>         System.out.println("\n*********** New Message ***********\n");
>         System.out.println("Row Number : " + i ++ );
>         System.out.println("Message    : " + HexUtiles.bytesToHex(value));
>         Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
>     }
> });
>
>
>
> On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani <soheil.ir08@gmail.com
> > wrote:
>
>> I want to use Cassandra native connection (Not Flink Cassandra
>> connection) to insert some data into Cassandra. According to the design of
>> the code, the connection to Cassandra will open once at the start and all
>> taskmanager use it to write data.  It's ok running in local mode.
>>
>> The problem is when I submit the code on YARN cluster, as each
>> taskmanager has it's own JVM, the connection to the Cassandra will not
>> share and I should open and close it for each taskmanager. Is there any way
>> to have a connection for all taskmanagers?
>>
>
>
>


-- 
"So you have to trust that the dots will somehow connect in your future."

Re: Insert data into Cassandra without Flink Cassandra connection

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

The only way that I can think of is if you keep your flatMap operator with parallelism 1, but that might defeat the purpose. Otherwise there is no way to open one single connection and share it across multiple TaskManagers (which can be running on different physical machines). Please rethink your solution/approach with respect to distributed nature of Flink.

However there are some valid use cases where one would like to have some part of his job graph distributed and some part(s) non distributed - like issuing one single commit after a distributed write, or processing a data in parallel but writing them to a relational database like MySQL via one single Sink operator.. 

Piotrek

> On 26 Apr 2018, at 15:23, Soheil Pourbafrani <so...@gmail.com> wrote:
> 
> Here is my code 
> 
> stream.flatMap(new FlatMapFunction<byte[], Void>() {
> 
>     @Override
>     public void flatMap(byte[] value, Collector<Void> out) throws Exception {
>         Parser.setInsert(true);
>         CassandraConnection.connect();
>         Parser.setInsert(true);
>         System.out.println("\n*********** New Message ***********\n");
>         System.out.println("Row Number : " + i ++ );
>         System.out.println("Message    : " + HexUtiles.bytesToHex(value));
>         Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
>     }
> });
> 
> 
> On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani <soheil.ir08@gmail.com <ma...@gmail.com>> wrote:
> I want to use Cassandra native connection (Not Flink Cassandra connection) to insert some data into Cassandra. According to the design of the code, the connection to Cassandra will open once at the start and all taskmanager use it to write data.  It's ok running in local mode.
> 
> The problem is when I submit the code on YARN cluster, as each taskmanager has it's own JVM, the connection to the Cassandra will not share and I should open and close it for each taskmanager. Is there any way to have a connection for all taskmanagers?
> 


Re: Insert data into Cassandra without Flink Cassandra connection

Posted by Soheil Pourbafrani <so...@gmail.com>.
Here is my code

stream.flatMap(new FlatMapFunction<byte[], Void>() {

    @Override
    public void flatMap(byte[] value, Collector<Void> out) throws Exception {
        Parser.setInsert(true);
        CassandraConnection.connect();
        Parser.setInsert(true);
        System.out.println("\n*********** New Message ***********\n");
        System.out.println("Row Number : " + i ++ );
        System.out.println("Message    : " + HexUtiles.bytesToHex(value));
        Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
    }
});



On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani <so...@gmail.com>
wrote:

> I want to use Cassandra native connection (Not Flink Cassandra connection)
> to insert some data into Cassandra. According to the design of the code,
> the connection to Cassandra will open once at the start and all taskmanager
> use it to write data.  It's ok running in local mode.
>
> The problem is when I submit the code on YARN cluster, as each
> taskmanager has it's own JVM, the connection to the Cassandra will not
> share and I should open and close it for each taskmanager. Is there any way
> to have a connection for all taskmanagers?
>