You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Rahul (Jira)" <ji...@apache.org> on 2020/02/15 06:18:00 UTC
[jira] [Updated] (PHOENIX-5727) Intermittent Upserts with Kafka and
Spark Streaming
[ https://issues.apache.org/jira/browse/PHOENIX-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rahul updated PHOENIX-5727:
---------------------------
Description:
Hi,
I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently fails to do upserts without throwing any errors this happens intermittently the frequency of data what i get is around 1000 rows/sec.
And my Input data set is such that we will have more updates on the row keys than inserts.
is this is known issue with phoenix?
Sample Code
A and B are composite keys with commit size of 500
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") val con_startTimeMillis = System.currentTimeMillis()
val con = DriverManager.getConnection("jdbc:phoenix:localhost")
println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs")
con.setAutoCommit(false);
for loop
{ var a = rec.getAs("A").toString
var b = rec.getAs("B").toString
var c = rec.getAs("C").toString v
ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString
var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString
var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString
var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString
var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString \ var upsert_stmt = "upsert into " + phoenix_tablename + " values ('" + a + "','" + b + "','" + c + "','" + d + "','" + e + "','" + f + "','" + g + "','" + h + "')"
println(">>>>upsert statement formed::" + upsert_stmt)
var stmt = con.prepareStatement(upsert_stmt)
stmt.executeUpdate()
bs=bs+1;
if (bs % commitSize == 0)
\\{ con.commit() }
}
con.commit()
con.close()
was:
Hi,
I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently fails to do upserts without throwing any errors this happens intermittently the frequency of data what i get is around 1000 rows/sec.
And my Input data set is such that we will have more updates on the row keys than inserts.
is this is known issue with phoenix?
Sample Code
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") val con_startTimeMillis = System.currentTimeMillis() val con = DriverManager.getConnection("jdbc:phoenix:localhost") println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs") con.setAutoCommit(false); for loop { var a = rec.getAs("A").toString var b = rec.getAs("B").toString var c = rec.getAs("C").toString var d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString var i = if (rec.getAs("I") == null) "" else rec.getAs("I").toString
var upsert_stmt = "upsert into " + phoenix_tablename + " values ('" + a + "','" + b + "','" + c + "','" + d + "','" + e + "','" + f + "','" + g + "','" + h + "')" println(">>>>upsert statement formed::" + upsert_stmt) var stmt = con.prepareStatement(upsert_stmt) stmt.executeUpdate() bs=bs+1; if (bs % commitSize == 0) \{ con.commit() } }
con.commit()
con.close()
> Intermittent Upserts with Kafka and Spark Streaming
> ---------------------------------------------------
>
> Key: PHOENIX-5727
> URL: https://issues.apache.org/jira/browse/PHOENIX-5727
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 4.14.0
> Reporter: Rahul
> Priority: Major
>
> Hi,
> I have a spark job which reads from kafka stream and writes to a phoenix table using Phoenix JDBC thick client with commit size of 500 what i have observed is the job silently fails to do upserts without throwing any errors this happens intermittently the frequency of data what i get is around 1000 rows/sec.
> And my Input data set is such that we will have more updates on the row keys than inserts.
> is this is known issue with phoenix?
>
> Sample Code
> A and B are composite keys with commit size of 500
>
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") val con_startTimeMillis = System.currentTimeMillis()
> val con = DriverManager.getConnection("jdbc:phoenix:localhost")
> println(">>>> time taken for connection::" + (System.currentTimeMillis() - con_startTimeMillis).toDouble / 1000 + " secs")
> con.setAutoCommit(false);
> for loop
> { var a = rec.getAs("A").toString
> var b = rec.getAs("B").toString
> var c = rec.getAs("C").toString v
> ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString
> var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString
> var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString
> var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString
> var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString \ var upsert_stmt = "upsert into " + phoenix_tablename + " values ('" + a + "','" + b + "','" + c + "','" + d + "','" + e + "','" + f + "','" + g + "','" + h + "')"
> println(">>>>upsert statement formed::" + upsert_stmt)
> var stmt = con.prepareStatement(upsert_stmt)
> stmt.executeUpdate()
> bs=bs+1;
> if (bs % commitSize == 0)
> \\{ con.commit() }
> }
>
> con.commit()
> con.close()
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)