You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Amit Singh Hora <ho...@gmail.com> on 2015/10/27 12:53:28 UTC
SPARKONHBase checkpointing issue
Hi all ,
I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
below code
object test {
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load("connection.conf").getConfig("connection")
val checkpointDirectory=conf.getString("spark.checkpointDir")
val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
functionToCreateContext(checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
println("always gets created")
val hconf = HBaseConfiguration.create();
val timeout= conf.getString("hbase.zookeepertimeout")
val master=conf.getString("hbase.hbase_master")
val zk=conf.getString("hbase.hbase_zkquorum")
val zkport=conf.getString("hbase.hbase_zk_port")
hconf.set("zookeeper.session.timeout",timeout);
hconf.set("hbase.client.retries.number", Integer.toString(1));
hconf.set("zookeeper.recovery.retry", Integer.toString(1));
hconf.set("hbase.master", master);
hconf.set("hbase.zookeeper.quorum",zk);
hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
hconf.set("hbase.zookeeper.property.clientPort",zkport );
val hbaseContext = new HBaseContext(sc, hconf);
return hbaseContext
}
def functionToCreateContext(checkpointDirectory: String): StreamingContext
= {
println("creating for frst time")
val conf = ConfigFactory.load("connection.conf").getConfig("connection")
val brokerlist = conf.getString("kafka.broker")
val topic = conf.getString("kafka.topic")
val Array(brokers, topics) = Array(brokerlist, topic)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
" )
sparkConf.set("spark.cleaner.ttl", "2");
sparkConf.setMaster("local[2]")
val topicsSet = topic.split(",").toSet
val batchduration = conf.getString("spark.batchduration").toInt
val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(batchduration))
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokerlist, "auto.offset.reset" -> "smallest")
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines=messages.map(_._2)
getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
"ecs_test",
(putRecord) => {
if (putRecord.length() > 0) {
var maprecord = new HashMap[String, String];
val mapper = new ObjectMapper();
//convert JSON string to Map
maprecord = mapper.readValue(putRecord,
new TypeReference[HashMap[String, String]]() {});
var ts: Long = maprecord.get("ts").toLong
var tweetID:Long= maprecord.get("id").toLong
val key=ts+"_"+tweetID;
val put = new Put(Bytes.toBytes(key))
maprecord.foreach(kv => {
put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
})
put
} else {
null
}
},
false);
ssc
}
}
i am not able to retrieve from checkpoint after restart ,always get
Unable to getConfig from broadcast
after debugging more i can see that the method for creating the HbaseContext
actually broadcasts the configuration ,context object passed
as a solution i just want to recreate the hbase context in every condition
weather the checkpoint exists or not
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: SPARKONHBase checkpointing issue
Posted by Tathagata Das <td...@databricks.com>.
Yes, the workaround is the same that has been suggested in the JIRA for
accumulator and broadcast variables. Basically make a singleton object
which lazily initializes the HBaseContext. Because of singleton, it wont
get serialized through checkpoint. After recovering, it will be
reinitialized lazily. This is the exact same approach I did for `
SQLContext.getOrCreate()
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L1259>`.
Take a look at the code.
On Tue, Oct 27, 2015 at 11:19 PM, Amit Hora <ho...@gmail.com> wrote:
> Thanks for sharing the link.Yes I understand that accumulators and
> broadcast variables state are not recovered from checkpoint but is there
> any way by which I can say that the HBaseContext in this context should nt
> be recovered from checkpoint rather must be reinitialized
> ------------------------------
> From: Adrian Tanase <at...@adobe.com>
> Sent: 27-10-2015 18:08
> To: Amit Singh Hora <ho...@gmail.com>; user@spark.apache.org
> Subject: Re: SPARKONHBase checkpointing issue
>
> Does this help?
>
> https://issues.apache.org/jira/browse/SPARK-5206
>
>
>
> On 10/27/15, 1:53 PM, "Amit Singh Hora" <ho...@gmail.com> wrote:
>
> >Hi all ,
> >
> >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
> >below code
> >object test {
> >
> > def main(args: Array[String]): Unit = {
> >
> >
> >
> > val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> > val checkpointDirectory=conf.getString("spark.checkpointDir")
> > val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
> > functionToCreateContext(checkpointDirectory)
> > })
> >
> >
> > ssc.start()
> > ssc.awaitTermination()
> >
> > }
> >
> > def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
> > println("always gets created")
> > val hconf = HBaseConfiguration.create();
> > val timeout= conf.getString("hbase.zookeepertimeout")
> > val master=conf.getString("hbase.hbase_master")
> > val zk=conf.getString("hbase.hbase_zkquorum")
> > val zkport=conf.getString("hbase.hbase_zk_port")
> >
> > hconf.set("zookeeper.session.timeout",timeout);
> > hconf.set("hbase.client.retries.number", Integer.toString(1));
> > hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> > hconf.set("hbase.master", master);
> > hconf.set("hbase.zookeeper.quorum",zk);
> > hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> > hconf.set("hbase.zookeeper.property.clientPort",zkport );
> >
> >
> > val hbaseContext = new HBaseContext(sc, hconf);
> > return hbaseContext
> > }
> > def functionToCreateContext(checkpointDirectory: String):
> StreamingContext
> >= {
> > println("creating for frst time")
> > val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> > val brokerlist = conf.getString("kafka.broker")
> > val topic = conf.getString("kafka.topic")
> >
> > val Array(brokers, topics) = Array(brokerlist, topic)
> >
> >
> > val sparkConf = new
> SparkConf().setAppName("HBaseBulkPutTimestampExample
> >" )
> > sparkConf.set("spark.cleaner.ttl", "2");
> > sparkConf.setMaster("local[2]")
> >
> >
> > val topicsSet = topic.split(",").toSet
> > val batchduration = conf.getString("spark.batchduration").toInt
> > val ssc: StreamingContext = new StreamingContext(sparkConf,
> >Seconds(batchduration))
> > ssc.checkpoint(checkpointDirectory) // set checkpoint directory
> > val kafkaParams = Map[String, String]("metadata.broker.list" ->
> >brokerlist, "auto.offset.reset" -> "smallest")
> > val messages = KafkaUtils.createDirectStream[String, String,
> >StringDecoder, StringDecoder](
> > ssc, kafkaParams, topicsSet)
> > val lines=messages.map(_._2)
> >
> >
> >
> > getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
> > "ecs_test",
> > (putRecord) => {
> > if (putRecord.length() > 0) {
> > var maprecord = new HashMap[String, String];
> > val mapper = new ObjectMapper();
> >
> > //convert JSON string to Map
> > maprecord = mapper.readValue(putRecord,
> > new TypeReference[HashMap[String, String]]() {});
> >
> > var ts: Long = maprecord.get("ts").toLong
> >
> > var tweetID:Long= maprecord.get("id").toLong
> > val key=ts+"_"+tweetID;
> >
> > val put = new Put(Bytes.toBytes(key))
> > maprecord.foreach(kv => {
> >
> >
> >put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
> >
> >
> > })
> >
> >
> > put
> > } else {
> > null
> > }
> > },
> > false);
> >
> > ssc
> >
> > }
> >}
> >
> >i am not able to retrieve from checkpoint after restart ,always get
> >Unable to getConfig from broadcast
> >
> >after debugging more i can see that the method for creating the
> HbaseContext
> >actually broadcasts the configuration ,context object passed
> >
> >as a solution i just want to recreate the hbase context in every condition
> >weather the checkpoint exists or not
> >
> >
> >
> >--
> >View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
> >Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> >---------------------------------------------------------------------
> >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >For additional commands, e-mail: user-help@spark.apache.org
> >
>
RE: SPARKONHBase checkpointing issue
Posted by Amit Hora <ho...@gmail.com>.
Thanks for sharing the link.Yes I understand that accumulators and broadcast variables state are not recovered from checkpoint but is there any way by which I can say that the HBaseContext in this context should nt be recovered from checkpoint rather must be reinitialized
-----Original Message-----
From: "Adrian Tanase" <at...@adobe.com>
Sent: 27-10-2015 18:08
To: "Amit Singh Hora" <ho...@gmail.com>; "user@spark.apache.org" <us...@spark.apache.org>
Subject: Re: SPARKONHBase checkpointing issue
Does this help?
https://issues.apache.org/jira/browse/SPARK-5206
On 10/27/15, 1:53 PM, "Amit Singh Hora" <ho...@gmail.com> wrote:
>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>
> def main(args: Array[String]): Unit = {
>
>
>
> val conf = ConfigFactory.load("connection.conf").getConfig("connection")
> val checkpointDirectory=conf.getString("spark.checkpointDir")
> val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
> functionToCreateContext(checkpointDirectory)
> })
>
>
> ssc.start()
> ssc.awaitTermination()
>
> }
>
> def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
> println("always gets created")
> val hconf = HBaseConfiguration.create();
> val timeout= conf.getString("hbase.zookeepertimeout")
> val master=conf.getString("hbase.hbase_master")
> val zk=conf.getString("hbase.hbase_zkquorum")
> val zkport=conf.getString("hbase.hbase_zk_port")
>
> hconf.set("zookeeper.session.timeout",timeout);
> hconf.set("hbase.client.retries.number", Integer.toString(1));
> hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> hconf.set("hbase.master", master);
> hconf.set("hbase.zookeeper.quorum",zk);
> hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>
> val hbaseContext = new HBaseContext(sc, hconf);
> return hbaseContext
> }
> def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
> println("creating for frst time")
> val conf = ConfigFactory.load("connection.conf").getConfig("connection")
> val brokerlist = conf.getString("kafka.broker")
> val topic = conf.getString("kafka.topic")
>
> val Array(brokers, topics) = Array(brokerlist, topic)
>
>
> val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
> sparkConf.set("spark.cleaner.ttl", "2");
> sparkConf.setMaster("local[2]")
>
>
> val topicsSet = topic.split(",").toSet
> val batchduration = conf.getString("spark.batchduration").toInt
> val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
> ssc.checkpoint(checkpointDirectory) // set checkpoint directory
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
> val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
> ssc, kafkaParams, topicsSet)
> val lines=messages.map(_._2)
>
>
>
> getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
> "ecs_test",
> (putRecord) => {
> if (putRecord.length() > 0) {
> var maprecord = new HashMap[String, String];
> val mapper = new ObjectMapper();
>
> //convert JSON string to Map
> maprecord = mapper.readValue(putRecord,
> new TypeReference[HashMap[String, String]]() {});
>
> var ts: Long = maprecord.get("ts").toLong
>
> var tweetID:Long= maprecord.get("id").toLong
> val key=ts+"_"+tweetID;
>
> val put = new Put(Bytes.toBytes(key))
> maprecord.foreach(kv => {
>
>
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>
>
> })
>
>
> put
> } else {
> null
> }
> },
> false);
>
> ssc
>
> }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>For additional commands, e-mail: user-help@spark.apache.org
>
Re: SPARKONHBase checkpointing issue
Posted by Adrian Tanase <at...@adobe.com>.
Does this help?
https://issues.apache.org/jira/browse/SPARK-5206
On 10/27/15, 1:53 PM, "Amit Singh Hora" <ho...@gmail.com> wrote:
>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>
> def main(args: Array[String]): Unit = {
>
>
>
> val conf = ConfigFactory.load("connection.conf").getConfig("connection")
> val checkpointDirectory=conf.getString("spark.checkpointDir")
> val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
> functionToCreateContext(checkpointDirectory)
> })
>
>
> ssc.start()
> ssc.awaitTermination()
>
> }
>
> def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
> println("always gets created")
> val hconf = HBaseConfiguration.create();
> val timeout= conf.getString("hbase.zookeepertimeout")
> val master=conf.getString("hbase.hbase_master")
> val zk=conf.getString("hbase.hbase_zkquorum")
> val zkport=conf.getString("hbase.hbase_zk_port")
>
> hconf.set("zookeeper.session.timeout",timeout);
> hconf.set("hbase.client.retries.number", Integer.toString(1));
> hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> hconf.set("hbase.master", master);
> hconf.set("hbase.zookeeper.quorum",zk);
> hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>
> val hbaseContext = new HBaseContext(sc, hconf);
> return hbaseContext
> }
> def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
> println("creating for frst time")
> val conf = ConfigFactory.load("connection.conf").getConfig("connection")
> val brokerlist = conf.getString("kafka.broker")
> val topic = conf.getString("kafka.topic")
>
> val Array(brokers, topics) = Array(brokerlist, topic)
>
>
> val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
> sparkConf.set("spark.cleaner.ttl", "2");
> sparkConf.setMaster("local[2]")
>
>
> val topicsSet = topic.split(",").toSet
> val batchduration = conf.getString("spark.batchduration").toInt
> val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
> ssc.checkpoint(checkpointDirectory) // set checkpoint directory
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
> val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
> ssc, kafkaParams, topicsSet)
> val lines=messages.map(_._2)
>
>
>
> getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
> "ecs_test",
> (putRecord) => {
> if (putRecord.length() > 0) {
> var maprecord = new HashMap[String, String];
> val mapper = new ObjectMapper();
>
> //convert JSON string to Map
> maprecord = mapper.readValue(putRecord,
> new TypeReference[HashMap[String, String]]() {});
>
> var ts: Long = maprecord.get("ts").toLong
>
> var tweetID:Long= maprecord.get("id").toLong
> val key=ts+"_"+tweetID;
>
> val put = new Put(Bytes.toBytes(key))
> maprecord.foreach(kv => {
>
>
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>
>
> })
>
>
> put
> } else {
> null
> }
> },
> false);
>
> ssc
>
> }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org