You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Rahul (Jira)" <ji...@apache.org> on 2020/02/15 06:18:00 UTC

[jira] [Updated] (PHOENIX-5727) Intermittent Upserts with Kafka and Spark Streaming

     [ https://issues.apache.org/jira/browse/PHOENIX-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rahul updated PHOENIX-5727:
---------------------------
    Description: 
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently fails to do upserts without throwing any errors this happens intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys than inserts.

is this is known issue with phoenix?

 

Sample Code

A and B are composite keys with commit size of 500

 

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")                        val con_startTimeMillis = System.currentTimeMillis()                       

val con = DriverManager.getConnection("jdbc:phoenix:localhost")                       

println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs")                     

  con.setAutoCommit(false);                                                

for loop

{                                var a = rec.getAs("A").toString                             

  var b = rec.getAs("B").toString                               

var c = rec.getAs("C").toString                                v

ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString                     

          var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString                               

var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString                               

var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString                               

var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString                                \                             var upsert_stmt = "upsert into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" + d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                               

println(">>>>upsert statement formed::" + upsert_stmt)                                

var stmt = con.prepareStatement(upsert_stmt)                               

stmt.executeUpdate()                           

    bs=bs+1;                               

if (bs % commitSize == 0)

\\{                                    con.commit()                                }

                        }

 

                      con.commit()

                        con.close()

 

  was:
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently fails to do upserts without throwing any errors this happens intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys than inserts.

is this is known issue with phoenix?

 

Sample Code

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")                        val con_startTimeMillis = System.currentTimeMillis()                        val con = DriverManager.getConnection("jdbc:phoenix:localhost")                        println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs")                        con.setAutoCommit(false);                                                for loop {                                var a = rec.getAs("A").toString                                var b = rec.getAs("B").toString                                var c = rec.getAs("C").toString                                var d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString                                var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString                                var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString                                var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString                                var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString                                var i = if (rec.getAs("I") == null) "" else rec.getAs("I").toString 
                                 var upsert_stmt = "upsert into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" + d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                                println(">>>>upsert statement formed::" + upsert_stmt)                                 var stmt = con.prepareStatement(upsert_stmt)                                stmt.executeUpdate()                                bs=bs+1;                                if (bs % commitSize == 0) \{                                    con.commit()                                }                         }

 

                      con.commit()

                        con.close()

 


> Intermittent Upserts with Kafka and Spark Streaming
> ---------------------------------------------------
>
>                 Key: PHOENIX-5727
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-5727
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 4.14.0
>            Reporter: Rahul
>            Priority: Major
>
> Hi,
> I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently fails to do upserts without throwing any errors this happens intermittently the frequency of data what i get is around 1000 rows/sec.
> And my Input data set is such that we will have more updates on the row keys than inserts.
> is this is known issue with phoenix?
>  
> Sample Code
> A and B are composite keys with commit size of 500
>  
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")                        val con_startTimeMillis = System.currentTimeMillis()                       
> val con = DriverManager.getConnection("jdbc:phoenix:localhost")                       
> println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs")                     
>   con.setAutoCommit(false);                                                
> for loop
> {                                var a = rec.getAs("A").toString                             
>   var b = rec.getAs("B").toString                               
> var c = rec.getAs("C").toString                                v
> ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString                     
>           var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString                               
> var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString                               
> var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString                               
> var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString                                \                             var upsert_stmt = "upsert into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" + d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                               
> println(">>>>upsert statement formed::" + upsert_stmt)                                
> var stmt = con.prepareStatement(upsert_stmt)                               
> stmt.executeUpdate()                           
>     bs=bs+1;                               
> if (bs % commitSize == 0)
> \\{                                    con.commit()                                }
>                         }
>  
>                       con.commit()
>                         con.close()
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)