You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shashank agarwal <sh...@gmail.com> on 2017/12/19 10:04:10 UTC

Cassandra POJO sink flink 1.4.0 in scala

HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my
scala application. Before sink, i was converting my scala datastream to
java stream and sinking in Cassandra. I have created pojo class in scala
liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving
error "Query must not be null or empty."

After dig into the CassandraSink code, I have found it's treating it as
case class and running CassandraScalaProductSinkBuilder which
do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it
as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2


Thanks
Shashank

Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by Timo Walther <tw...@apache.org>.
Hi Shashank,

Scala case classes are treated as a special tuple type in Flink. If you 
want to make a POJO out of it, just remove the "case" keyword and make 
sure that the class is static (in the companion object).

I hope that helps.

Timo


Am 12/19/17 um 11:04 AM schrieb shashank agarwal:
> HI,
>
> I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink 
> in my scala application. Before sink, i was converting my scala 
> datastream to java stream and sinking in Cassandra. I have created 
> pojo class in scala liked that :
>
> @SerialVersionUID(507L)
> @Table(keyspace = "neofp", name = "order_detail")
> case class OrderFinal(
>                        @BeanProperty var order_name: String,
>                        @BeanProperty var user: String )extends 
> Serializable
> {
>   def this() {
>     this("NA", "NA",)
>   }
> }
>
> and this was working fine with sink. after upgrading to 1.4.0 it's 
> giving error "Query must not be null or empty."
>
> After dig into the CassandraSink code, I have found it's treating it 
> as case class and running CassandraScalaProductSinkBuilder which 
> do sanityCheck of query existence.
>
> So how I can create POJO class in scala so CassandraSink treats it 
> as CassandraPojoSinkBuilder?
>
> For workaround now I have downgraded the only connector to 1.3.2
>
>
> Thanks
> Shashank
>


Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by Timo Walther <tw...@apache.org>.
Hi Shashank,

the exception you get is a known issue [0] that will be fixed with Flink 
1.4.1. We improved the dependency management but it seems this causes 
some problems with the Cassandra connector right now.

So as a workaround you can add netty (version 4.0) to your dependencies. 
This should fix the problem until the new Flink version. Please let us 
know if yes.

Regards,
Timo

[0] https://issues.apache.org/jira/browse/FLINK-8295



Am 12/19/17 um 1:41 PM schrieb shashank agarwal:
> I have tried that by creating class with companion static object:
>
> @SerialVersionUID(507L)
> @Table(keyspace = "neofp", name = "order_detail")
> class OrderFinal(
>  @BeanProperty var order_name: String,
>  @BeanProperty var user: String )extends Serializable
> {
>   def this() {
>     this("NA", "NA",)
>   }
> }
> object OrderFinal
> {
>
> }
>
>
>
> When running with 1.4.0 it's giving following error :
>
>
> java.lang.NoClassDefFoundError: Could not initialize class 
> com.datastax.driver.core.NettyUtil
> at 
> com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> 12/19/2017 18:04:33Job execution switched to status FAILING.
> java.lang.NoClassDefFoundError: Could not initialize class 
> com.datastax.driver.core.NettyUtil
> at 
> com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
> at 
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal 
> <shashank734@gmail.com <ma...@gmail.com>> wrote:
>
>     HI,
>
>     I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra
>     sink in my scala application. Before sink, i was converting my
>     scala datastream to java stream and sinking in Cassandra. I have
>     created pojo class in scala liked that :
>
>     @SerialVersionUID(507L)
>     @Table(keyspace = "neofp", name = "order_detail")
>     case class OrderFinal(
>                            @BeanProperty var order_name: String,
>                            @BeanProperty var user: String )extends
>     Serializable
>     {
>       def this() {
>         this("NA", "NA",)
>       }
>     }
>
>     and this was working fine with sink. after upgrading to 1.4.0 it's
>     giving error "Query must not be null or empty."
>
>     After dig into the CassandraSink code, I have found it's treating
>     it as case class and running CassandraScalaProductSinkBuilder
>     which do sanityCheck of query existence.
>
>     So how I can create POJO class in scala so CassandraSink treats it
>     as CassandraPojoSinkBuilder?
>
>     For workaround now I have downgraded the only connector to 1.3.2
>
>
>     Thanks
>     Shashank
>
>
>
>
> -- 
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>


Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by shashank agarwal <sh...@gmail.com>.
 Hi Micheal,

Thanks for the response actually I have solved the issue. I was sharing my
knowledge how I solved that. For sinking scala classes like JAVA Pojo. We
have to convert that to
JavaStream first but in 1.4 that already done by connector so no need to do
that in 1.4

We have to write scala class like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends
Serializable
{
  def this() {
    this("NA", "NA")
  }
}

The variable name should be same as the column name in Cassandra table. If
we use case class in case of class than that will call
CassandraScalaProductSinkBuilder in the connector at that required query.
So we are making scala class as POJO class.
Scala class should be handled in connector separately. I will request the
feature for the same. While that this is workaround for scala developers.





‌

On Thu, Dec 28, 2017 at 11:28 AM, Michael Fong <mc...@gmail.com>
wrote:

> Hi, shashank agarwal
> <https://plus.google.com/u/1/102996216504701757998?prsrc=4>
>
>
> Not sure if I can answer fully your question, but after digging some code,
> I am not sure if C* connector totally supports Scala case class + CQL data
> mapping at the moment. I may be totally wrong, and you need to ask the
> flink dev about this. However, I have some toy examples that you could
> check out to see which uses CassandraScalaProductSinkBuilder + predefined
> CQL query + entity. I am not using Scala case class so may not fit your
> need.
>
> The example snippet you may find @
> https://github.com/mcfongtw/flink-cassandra-connector-examples/
>
> Regards,
>
> On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong <mc...@gmail.com>
> wrote:
>
>> Hi, shashank agarwal
>>
>>
>> AFAIK, in java side, for a pojo data type, you don't need to set query
>> since the CQL data mapping would take care of that whereas dealing with
>> java tuples, you do need to provide a upsert query so that cassandra knows
>> what to insert into the table.
>> Scala tuple case is clear, same as java - providing a CQL query; however,
>> I don't know what's up with Scala pojo case (class) though...
>>
>> Regards,
>>
>> Michael
>>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by Michael Fong <mc...@gmail.com>.
Hi, shashank agarwal
<https://plus.google.com/u/1/102996216504701757998?prsrc=4>


Not sure if I can answer fully your question, but after digging some code,
I am not sure if C* connector totally supports Scala case class + CQL data
mapping at the moment. I may be totally wrong, and you need to ask the
flink dev about this. However, I have some toy examples that you could
check out to see which uses CassandraScalaProductSinkBuilder + predefined
CQL query + entity. I am not using Scala case class so may not fit your
need.

The example snippet you may find @
https://github.com/mcfongtw/flink-cassandra-connector-examples/

Regards,

On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong <mc...@gmail.com> wrote:

> Hi, shashank agarwal
>
>
> AFAIK, in java side, for a pojo data type, you don't need to set query
> since the CQL data mapping would take care of that whereas dealing with
> java tuples, you do need to provide a upsert query so that cassandra knows
> what to insert into the table.
> Scala tuple case is clear, same as java - providing a CQL query; however,
> I don't know what's up with Scala pojo case (class) though...
>
> Regards,
>
> Michael
>

Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by Michael Fong <mc...@gmail.com>.
Hi, shashank agarwal


AFAIK, in java side, for a pojo data type, you don't need to set query
since the CQL data mapping would take care of that whereas dealing with
java tuples, you do need to provide a upsert query so that cassandra knows
what to insert into the table.
Scala tuple case is clear, same as java - providing a CQL query; however, I
don't know what's up with Scala pojo case (class) though...

Regards,

Michael

Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by shashank agarwal <sh...@gmail.com>.
Yes but when CassandraScalaProductSinkBuilder called after identifying case
class in CassandraSink class it will do sanityCheck and will throw the
exception cause It won’t pass any query in that case.

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java

On Thu, 21 Dec 2017 at 7:36 PM, Timo Walther <tw...@apache.org> wrote:

> Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1.
>
> For case classes there is also a dedicated cassandra sink (every case
> class is a Product):
>
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
>
> Regards,
> Timo
>
>
>
> Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
>
> Hi,
>
> I have added netty-all 4.0 as dependency now it's working fine. Only thing
> I had to create POJO class ion scala like this.
>
> @SerialVersionUID(507L)
> @Table(keyspace = "twtt", name = "order")
> class OrderFinal(
>                        @BeanProperty var name: String,
>                        @BeanProperty var userEmail: String)extends
> Serializable
> {
>   def this() {
>     this("NA", "NA")
>   }
> }
>
>
> If I am removing @BeanProperty or converting var to Val. It's giving error
> of no setters or getters found or multiple found. This is the final
> workaround i found.
>
>
>
> ‌
>
> On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <sh...@gmail.com>
> wrote:
>
>> I have tried that by creating class with companion static object:
>>
>> @SerialVersionUID(507L)
>> @Table(keyspace = "neofp", name = "order_detail")
>> class OrderFinal(
>>                        @BeanProperty var order_name: String,
>>                        @BeanProperty var user: String )extends
>> Serializable
>> {
>>   def this() {
>>     this("NA", "NA",)
>>   }
>> }
>> object OrderFinal
>> {
>>
>> }
>>
>>
>>
>> When running with 1.4.0 it's giving following error :
>>
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.datastax.driver.core.NettyUtil
>> at
>> com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
>> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
>> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
>> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
>> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 12/19/2017 18:04:33 Job execution switched to status FAILING.
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.datastax.driver.core.NettyUtil
>> at
>> com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
>> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
>> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
>> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
>> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <sh...@gmail.com>
>> wrote:
>>
>>> HI,
>>>
>>> I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in
>>> my scala application. Before sink, i was converting my scala datastream to
>>> java stream and sinking in Cassandra. I have created pojo class in scala
>>> liked that :
>>>
>>> @SerialVersionUID(507L)
>>> @Table(keyspace = "neofp", name = "order_detail")
>>> case class OrderFinal(
>>>                        @BeanProperty var order_name: String,
>>>                        @BeanProperty var user: String )extends
>>> Serializable
>>> {
>>>   def this() {
>>>     this("NA", "NA",)
>>>   }
>>> }
>>>
>>> and this was working fine with sink. after upgrading to 1.4.0 it's
>>> giving error "Query must not be null or empty."
>>>
>>> After dig into the CassandraSink code, I have found it's treating it as
>>> case class and running CassandraScalaProductSinkBuilder which
>>> do sanityCheck of query existence.
>>>
>>> So how I can create POJO class in scala so CassandraSink treats it
>>> as CassandraPojoSinkBuilder?
>>>
>>> For workaround now I have downgraded the only connector to 1.3.2
>>>
>>>
>>> Thanks
>>> Shashank
>>>
>>>
>>
>>
>> --
>> Thanks Regards
>>
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
> --
Sent from iPhone 5

Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by Timo Walther <tw...@apache.org>.
Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1.

For case classes there is also a dedicated cassandra sink (every case 
class is a Product):

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java

Regards,
Timo



Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
> Hi,
>
> I have added netty-all 4.0 as dependency now it's working fine. Only 
> thing I had to create POJO class ion scala like this.
>
> @SerialVersionUID(507L)
> @Table(keyspace = "twtt", name = "order")
> class OrderFinal(
>                        @BeanProperty var name: String,
>                        @BeanProperty var userEmail: String)extends 
> Serializable
> {
>   def this() {
>     this("NA", "NA")
>   }
> }
>
>
> If I am removing @BeanProperty or converting var to Val. It's giving 
> error of no setters or getters found or multiple found. This is the 
> final workaround i found.
>
>
>
> ‌
>
> On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal 
> <shashank734@gmail.com <ma...@gmail.com>> wrote:
>
>     I have tried that by creating class with companion static object:
>
>     @SerialVersionUID(507L)
>     @Table(keyspace = "neofp", name = "order_detail")
>     class OrderFinal(
>      @BeanProperty var order_name: String,
>      @BeanProperty var user: String )extends Serializable
>     {
>       def this() {
>         this("NA", "NA",)
>       }
>     }
>     object OrderFinal
>     {
>
>     }
>
>
>
>     When running with 1.4.0 it's giving following error :
>
>
>     java.lang.NoClassDefFoundError: Could not initialize class
>     com.datastax.driver.core.NettyUtil
>     at
>     com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
>     at
>     com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
>     at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
>     at com.datastax.driver.core.Cluster.init(Cluster.java:162)
>     at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
>     at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
>     at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
>     at
>     org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
>     at
>     org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
>     at
>     org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at
>     org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at
>     org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:745)
>
>     12/19/2017 18:04:33Job execution switched to status FAILING.
>     java.lang.NoClassDefFoundError: Could not initialize class
>     com.datastax.driver.core.NettyUtil
>     at
>     com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
>     at
>     com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
>     at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
>     at com.datastax.driver.core.Cluster.init(Cluster.java:162)
>     at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
>     at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
>     at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
>     at
>     org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
>     at
>     org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
>     at
>     org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at
>     org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at
>     org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:745)
>
>     On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal
>     <shashank734@gmail.com <ma...@gmail.com>> wrote:
>
>         HI,
>
>         I have upgraded flink from 1.3.2 to 1.4.0. I am using
>         cassandra sink in my scala application. Before sink, i was
>         converting my scala datastream to java stream and sinking in
>         Cassandra. I have created pojo class in scala liked that :
>
>         @SerialVersionUID(507L)
>         @Table(keyspace = "neofp", name = "order_detail")
>         case class OrderFinal(
>                                @BeanProperty var order_name: String,
>                                @BeanProperty var user: String )extends
>         Serializable
>         {
>           def this() {
>             this("NA", "NA",)
>           }
>         }
>
>         and this was working fine with sink. after upgrading to 1.4.0
>         it's giving error "Query must not be null or empty."
>
>         After dig into the CassandraSink code, I have found it's
>         treating it as case class and
>         running CassandraScalaProductSinkBuilder which do sanityCheck
>         of query existence.
>
>         So how I can create POJO class in scala so CassandraSink
>         treats it as CassandraPojoSinkBuilder?
>
>         For workaround now I have downgraded the only connector to 1.3.2
>
>
>         Thanks
>         Shashank
>
>
>
>
>     -- 
>     Thanks Regards
>
>     SHASHANK AGARWAL
>      ---  Trying to mobilize the things....
>
>
>
>
> -- 
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>


Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by shashank agarwal <sh...@gmail.com>.
Hi,

I have added netty-all 4.0 as dependency now it's working fine. Only thing
I had to create POJO class ion scala like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends
Serializable
{
  def this() {
    this("NA", "NA")
  }
}


If I am removing @BeanProperty or converting var to Val. It's giving error
of no setters or getters found or multiple found. This is the final
workaround i found.



‌

On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <sh...@gmail.com>
wrote:

> I have tried that by creating class with companion static object:
>
> @SerialVersionUID(507L)
> @Table(keyspace = "neofp", name = "order_detail")
> class OrderFinal(
>                        @BeanProperty var order_name: String,
>                        @BeanProperty var user: String )extends Serializable
> {
>   def this() {
>     this("NA", "NA",)
>   }
> }
> object OrderFinal
> {
>
> }
>
>
>
> When running with 1.4.0 it's giving following error :
>
>
> java.lang.NoClassDefFoundError: Could not initialize class
> com.datastax.driver.core.NettyUtil
> at com.datastax.driver.core.NettyOptions.eventLoopGroup(
> NettyOptions.java:96)
> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(
> CassandraSinkBase.java:88)
> at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(
> CassandraPojoSink.java:64)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.api.operators.StreamSink.open(
> StreamSink.java:48)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> 12/19/2017 18:04:33 Job execution switched to status FAILING.
> java.lang.NoClassDefFoundError: Could not initialize class
> com.datastax.driver.core.NettyUtil
> at com.datastax.driver.core.NettyOptions.eventLoopGroup(
> NettyOptions.java:96)
> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(
> CassandraSinkBase.java:88)
> at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(
> CassandraPojoSink.java:64)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.api.operators.StreamSink.open(
> StreamSink.java:48)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <sh...@gmail.com>
> wrote:
>
>> HI,
>>
>> I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in
>> my scala application. Before sink, i was converting my scala datastream to
>> java stream and sinking in Cassandra. I have created pojo class in scala
>> liked that :
>>
>> @SerialVersionUID(507L)
>> @Table(keyspace = "neofp", name = "order_detail")
>> case class OrderFinal(
>>                        @BeanProperty var order_name: String,
>>                        @BeanProperty var user: String )extends
>> Serializable
>> {
>>   def this() {
>>     this("NA", "NA",)
>>   }
>> }
>>
>> and this was working fine with sink. after upgrading to 1.4.0 it's giving
>> error "Query must not be null or empty."
>>
>> After dig into the CassandraSink code, I have found it's treating it as
>> case class and running CassandraScalaProductSinkBuilder which
>> do sanityCheck of query existence.
>>
>> So how I can create POJO class in scala so CassandraSink treats it
>> as CassandraPojoSinkBuilder?
>>
>> For workaround now I have downgraded the only connector to 1.3.2
>>
>>
>> Thanks
>> Shashank
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Cassandra POJO sink flink 1.4.0 in scala

Posted by shashank agarwal <sh...@gmail.com>.
I have tried that by creating class with companion static object:

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class
com.datastax.driver.core.NettyUtil
at
com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class
com.datastax.driver.core.NettyUtil
at
com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <sh...@gmail.com>
wrote:

> HI,
>
> I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my
> scala application. Before sink, i was converting my scala datastream to
> java stream and sinking in Cassandra. I have created pojo class in scala
> liked that :
>
> @SerialVersionUID(507L)
> @Table(keyspace = "neofp", name = "order_detail")
> case class OrderFinal(
>                        @BeanProperty var order_name: String,
>                        @BeanProperty var user: String )extends Serializable
> {
>   def this() {
>     this("NA", "NA",)
>   }
> }
>
> and this was working fine with sink. after upgrading to 1.4.0 it's giving
> error "Query must not be null or empty."
>
> After dig into the CassandraSink code, I have found it's treating it as
> case class and running CassandraScalaProductSinkBuilder which
> do sanityCheck of query existence.
>
> So how I can create POJO class in scala so CassandraSink treats it
> as CassandraPojoSinkBuilder?
>
> For workaround now I have downgraded the only connector to 1.3.2
>
>
> Thanks
> Shashank
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....