You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ahmed Nawar <ah...@gmail.com> on 2015/08/27 20:42:43 UTC

Commit DB Transaction for each partition

Thanks for foreach idea. But once i used it i got empty rdd. I think
because "results" is an iterator.

Yes i know "Map is lazy" but i expected there is solution to force action.

I can not use foreachPartition because i need reuse the new RDD after some
maps.



On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <co...@koeninger.org> wrote:

>
> Map is lazy.  You need an actual action, or nothing will happen.  Use
> foreachPartition, or do an empty foreach after the map.
>
> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ah...@gmail.com>
> wrote:
>
>> Dears,
>>
>>     I needs to commit DB Transaction for each partition,Not for each row.
>> below didn't work for me.
>>
>>
>> rdd.mapPartitions(partitionOfRecords => {
>>
>> DBConnectionInit()
>>
>> val results = partitionOfRecords.map(......)
>>
>> DBConnection.commit()
>>
>>
>> })
>>
>>
>>
>> Best regards,
>>
>> Ahmed Atef Nawwar
>>
>> Data Management & Big Data Consultant
>>
>>
>>
>>
>>
>>
>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Your kafka broker died or you otherwise had a rebalance.
>>>
>>> Normally spark retries take care of that.
>>>
>>> Is there something going on with your kafka installation, that rebalance
>>> is taking especially long?
>>>
>>> Yes, increasing backoff / max number of retries will "help", but it's
>>> better to figure out what's going on with kafka.
>>>
>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> My streaming application gets killed with below error
>>>>
>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>> [testtopic,193]))
>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>>> for time 1440626120000 ms
>>>> org.apache.spark.SparkException:
>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([testtopic,115]))
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>> at
>>>>
>>>>
>>>>
>>>> Kafka params in job logs printed are :
>>>>  value.serializer = class
>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>         key.serializer = class
>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>         block.on.buffer.full = true
>>>>         retry.backoff.ms = 100
>>>>         buffer.memory = 1048576
>>>>         batch.size = 16384
>>>>         metrics.sample.window.ms = 30000
>>>>         metadata.max.age.ms = 300000
>>>>         receive.buffer.bytes = 32768
>>>>         timeout.ms = 30000
>>>>         max.in.flight.requests.per.connection = 5
>>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>>         metric.reporters = []
>>>>         client.id =
>>>>         compression.type = none
>>>>         retries = 0
>>>>         max.request.size = 1048576
>>>>         send.buffer.bytes = 131072
>>>>         acks = all
>>>>         reconnect.backoff.ms = 10
>>>>         linger.ms = 0
>>>>         metrics.num.samples = 2
>>>>         metadata.fetch.timeout.ms = 60000
>>>>
>>>>
>>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>>> best way to handle it ?
>>>> Increasing retries and backoff time  wil help and to what values those
>>>> should be set to never have streaming application failure - rather it keep
>>>> on retrying after few seconds and send a event so that my custom code can
>>>> send notification of kafka broker down if its because of that.
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>>
>

Re: Commit DB Transaction for each partition

Posted by Ahmed Nawar <ah...@gmail.com>.
Thanks a lot for your support. It is working now.
I wrote it like below


val newRDD = rdd.mapPartitions { partition => {

  val result = partition.map(.....)

  result
}
}

newRDD.foreach {

}


On Thu, Aug 27, 2015 at 10:34 PM, Cody Koeninger <co...@koeninger.org> wrote:

> This job contains a spark output action, and is what I originally meant:
>
>
> rdd.mapPartitions {
>   result
> }.foreach {
>
> }
>
> This job is just a transformation, and won't do anything unless you have
> another output action.  Not to mention, it will exhaust the iterator, as
> you noticed:
>
> rdd.mapPartitions {
>   result.foreach
>   result
> }
>
>
>
> On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <ah...@gmail.com>
> wrote:
>
>> Yes, of course, I am doing that. But once i added results.foreach(row=>
>> {})   i pot empty RDD.
>>
>>
>>
>> rdd.mapPartitions(partitionOfRecords => {
>>
>> DBConnectionInit()
>>
>> val results = partitionOfRecords.map(......)
>>
>> DBConnection.commit()
>>
>> results.foreach(row=> {})
>>
>> results
>>
>> })
>>
>>
>>
>> On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> You need to return an iterator from the closure you provide to
>>> mapPartitions
>>>
>>> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ah...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for foreach idea. But once i used it i got empty rdd. I think
>>>> because "results" is an iterator.
>>>>
>>>> Yes i know "Map is lazy" but i expected there is solution to force
>>>> action.
>>>>
>>>> I can not use foreachPartition because i need reuse the new RDD after
>>>> some maps.
>>>>
>>>>
>>>>
>>>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>>
>>>>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>>>>> foreachPartition, or do an empty foreach after the map.
>>>>>
>>>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ah...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Dears,
>>>>>>
>>>>>>     I needs to commit DB Transaction for each partition,Not for each
>>>>>> row. below didn't work for me.
>>>>>>
>>>>>>
>>>>>> rdd.mapPartitions(partitionOfRecords => {
>>>>>>
>>>>>> DBConnectionInit()
>>>>>>
>>>>>> val results = partitionOfRecords.map(......)
>>>>>>
>>>>>> DBConnection.commit()
>>>>>>
>>>>>>
>>>>>> })
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Ahmed Atef Nawwar
>>>>>>
>>>>>> Data Management & Big Data Consultant
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>>>>
>>>>>>> Normally spark retries take care of that.
>>>>>>>
>>>>>>> Is there something going on with your kafka installation, that
>>>>>>> rebalance is taking especially long?
>>>>>>>
>>>>>>> Yes, increasing backoff / max number of retries will "help", but
>>>>>>> it's better to figure out what's going on with kafka.
>>>>>>>
>>>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> My streaming application gets killed with below error
>>>>>>>>
>>>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>>>>> [testtopic,193]))
>>>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating
>>>>>>>> jobs for time 1440626120000 ms
>>>>>>>> org.apache.spark.SparkException:
>>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>> Set([testtopic,115]))
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Kafka params in job logs printed are :
>>>>>>>>  value.serializer = class
>>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>>         key.serializer = class
>>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>>         block.on.buffer.full = true
>>>>>>>>         retry.backoff.ms = 100
>>>>>>>>         buffer.memory = 1048576
>>>>>>>>         batch.size = 16384
>>>>>>>>         metrics.sample.window.ms = 30000
>>>>>>>>         metadata.max.age.ms = 300000
>>>>>>>>         receive.buffer.bytes = 32768
>>>>>>>>         timeout.ms = 30000
>>>>>>>>         max.in.flight.requests.per.connection = 5
>>>>>>>>         bootstrap.servers = [broker1:9092, broker2:9092,
>>>>>>>> broker3:9092]
>>>>>>>>         metric.reporters = []
>>>>>>>>         client.id =
>>>>>>>>         compression.type = none
>>>>>>>>         retries = 0
>>>>>>>>         max.request.size = 1048576
>>>>>>>>         send.buffer.bytes = 131072
>>>>>>>>         acks = all
>>>>>>>>         reconnect.backoff.ms = 10
>>>>>>>>         linger.ms = 0
>>>>>>>>         metrics.num.samples = 2
>>>>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>>>>
>>>>>>>>
>>>>>>>> Is it kafka broker getting down and job is getting killed ? Whats
>>>>>>>> the best way to handle it ?
>>>>>>>> Increasing retries and backoff time  wil help and to what values
>>>>>>>> those should be set to never have streaming application failure - rather it
>>>>>>>> keep on retrying after few seconds and send a event so that my custom code
>>>>>>>> can send notification of kafka broker down if its because of that.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Commit DB Transaction for each partition

Posted by Cody Koeninger <co...@koeninger.org>.
This job contains a spark output action, and is what I originally meant:


rdd.mapPartitions {
  result
}.foreach {

}

This job is just a transformation, and won't do anything unless you have
another output action.  Not to mention, it will exhaust the iterator, as
you noticed:

rdd.mapPartitions {
  result.foreach
  result
}



On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <ah...@gmail.com> wrote:

> Yes, of course, I am doing that. But once i added results.foreach(row=>
> {})   i pot empty RDD.
>
>
>
> rdd.mapPartitions(partitionOfRecords => {
>
> DBConnectionInit()
>
> val results = partitionOfRecords.map(......)
>
> DBConnection.commit()
>
> results.foreach(row=> {})
>
> results
>
> })
>
>
>
> On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> You need to return an iterator from the closure you provide to
>> mapPartitions
>>
>> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ah...@gmail.com>
>> wrote:
>>
>>> Thanks for foreach idea. But once i used it i got empty rdd. I think
>>> because "results" is an iterator.
>>>
>>> Yes i know "Map is lazy" but i expected there is solution to force
>>> action.
>>>
>>> I can not use foreachPartition because i need reuse the new RDD after
>>> some maps.
>>>
>>>
>>>
>>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>>
>>>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>>>> foreachPartition, or do an empty foreach after the map.
>>>>
>>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ah...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dears,
>>>>>
>>>>>     I needs to commit DB Transaction for each partition,Not for each
>>>>> row. below didn't work for me.
>>>>>
>>>>>
>>>>> rdd.mapPartitions(partitionOfRecords => {
>>>>>
>>>>> DBConnectionInit()
>>>>>
>>>>> val results = partitionOfRecords.map(......)
>>>>>
>>>>> DBConnection.commit()
>>>>>
>>>>>
>>>>> })
>>>>>
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Ahmed Atef Nawwar
>>>>>
>>>>> Data Management & Big Data Consultant
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>>>
>>>>>> Normally spark retries take care of that.
>>>>>>
>>>>>> Is there something going on with your kafka installation, that
>>>>>> rebalance is taking especially long?
>>>>>>
>>>>>> Yes, increasing backoff / max number of retries will "help", but it's
>>>>>> better to figure out what's going on with kafka.
>>>>>>
>>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> My streaming application gets killed with below error
>>>>>>>
>>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>>>> [testtopic,193]))
>>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating
>>>>>>> jobs for time 1440626120000 ms
>>>>>>> org.apache.spark.SparkException:
>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>> Set([testtopic,115]))
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>>>> at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Kafka params in job logs printed are :
>>>>>>>  value.serializer = class
>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>         key.serializer = class
>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>         block.on.buffer.full = true
>>>>>>>         retry.backoff.ms = 100
>>>>>>>         buffer.memory = 1048576
>>>>>>>         batch.size = 16384
>>>>>>>         metrics.sample.window.ms = 30000
>>>>>>>         metadata.max.age.ms = 300000
>>>>>>>         receive.buffer.bytes = 32768
>>>>>>>         timeout.ms = 30000
>>>>>>>         max.in.flight.requests.per.connection = 5
>>>>>>>         bootstrap.servers = [broker1:9092, broker2:9092,
>>>>>>> broker3:9092]
>>>>>>>         metric.reporters = []
>>>>>>>         client.id =
>>>>>>>         compression.type = none
>>>>>>>         retries = 0
>>>>>>>         max.request.size = 1048576
>>>>>>>         send.buffer.bytes = 131072
>>>>>>>         acks = all
>>>>>>>         reconnect.backoff.ms = 10
>>>>>>>         linger.ms = 0
>>>>>>>         metrics.num.samples = 2
>>>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>>>
>>>>>>>
>>>>>>> Is it kafka broker getting down and job is getting killed ? Whats
>>>>>>> the best way to handle it ?
>>>>>>> Increasing retries and backoff time  wil help and to what values
>>>>>>> those should be set to never have streaming application failure - rather it
>>>>>>> keep on retrying after few seconds and send a event so that my custom code
>>>>>>> can send notification of kafka broker down if its because of that.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Commit DB Transaction for each partition

Posted by Ahmed Nawar <ah...@gmail.com>.
Yes, of course, I am doing that. But once i added results.foreach(row=> {})
  i pot empty RDD.



rdd.mapPartitions(partitionOfRecords => {

DBConnectionInit()

val results = partitionOfRecords.map(......)

DBConnection.commit()

results.foreach(row=> {})

results

})



On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <co...@koeninger.org> wrote:

> You need to return an iterator from the closure you provide to
> mapPartitions
>
> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ah...@gmail.com>
> wrote:
>
>> Thanks for foreach idea. But once i used it i got empty rdd. I think
>> because "results" is an iterator.
>>
>> Yes i know "Map is lazy" but i expected there is solution to force action.
>>
>> I can not use foreachPartition because i need reuse the new RDD after
>> some maps.
>>
>>
>>
>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>>
>>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>>> foreachPartition, or do an empty foreach after the map.
>>>
>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ah...@gmail.com>
>>> wrote:
>>>
>>>> Dears,
>>>>
>>>>     I needs to commit DB Transaction for each partition,Not for each
>>>> row. below didn't work for me.
>>>>
>>>>
>>>> rdd.mapPartitions(partitionOfRecords => {
>>>>
>>>> DBConnectionInit()
>>>>
>>>> val results = partitionOfRecords.map(......)
>>>>
>>>> DBConnection.commit()
>>>>
>>>>
>>>> })
>>>>
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> Ahmed Atef Nawwar
>>>>
>>>> Data Management & Big Data Consultant
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>>
>>>>> Normally spark retries take care of that.
>>>>>
>>>>> Is there something going on with your kafka installation, that
>>>>> rebalance is taking especially long?
>>>>>
>>>>> Yes, increasing backoff / max number of retries will "help", but it's
>>>>> better to figure out what's going on with kafka.
>>>>>
>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> My streaming application gets killed with below error
>>>>>>
>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>>> [testtopic,193]))
>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>>>>> for time 1440626120000 ms
>>>>>> org.apache.spark.SparkException:
>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>> Set([testtopic,115]))
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>>> at
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kafka params in job logs printed are :
>>>>>>  value.serializer = class
>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>         key.serializer = class
>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>         block.on.buffer.full = true
>>>>>>         retry.backoff.ms = 100
>>>>>>         buffer.memory = 1048576
>>>>>>         batch.size = 16384
>>>>>>         metrics.sample.window.ms = 30000
>>>>>>         metadata.max.age.ms = 300000
>>>>>>         receive.buffer.bytes = 32768
>>>>>>         timeout.ms = 30000
>>>>>>         max.in.flight.requests.per.connection = 5
>>>>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>>>>         metric.reporters = []
>>>>>>         client.id =
>>>>>>         compression.type = none
>>>>>>         retries = 0
>>>>>>         max.request.size = 1048576
>>>>>>         send.buffer.bytes = 131072
>>>>>>         acks = all
>>>>>>         reconnect.backoff.ms = 10
>>>>>>         linger.ms = 0
>>>>>>         metrics.num.samples = 2
>>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>>
>>>>>>
>>>>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>>>>> best way to handle it ?
>>>>>> Increasing retries and backoff time  wil help and to what values
>>>>>> those should be set to never have streaming application failure - rather it
>>>>>> keep on retrying after few seconds and send a event so that my custom code
>>>>>> can send notification of kafka broker down if its because of that.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Commit DB Transaction for each partition

Posted by Cody Koeninger <co...@koeninger.org>.
You need to return an iterator from the closure you provide to mapPartitions

On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ah...@gmail.com> wrote:

> Thanks for foreach idea. But once i used it i got empty rdd. I think
> because "results" is an iterator.
>
> Yes i know "Map is lazy" but i expected there is solution to force action.
>
> I can not use foreachPartition because i need reuse the new RDD after some
> maps.
>
>
>
> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>>
>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>> foreachPartition, or do an empty foreach after the map.
>>
>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ah...@gmail.com>
>> wrote:
>>
>>> Dears,
>>>
>>>     I needs to commit DB Transaction for each partition,Not for each
>>> row. below didn't work for me.
>>>
>>>
>>> rdd.mapPartitions(partitionOfRecords => {
>>>
>>> DBConnectionInit()
>>>
>>> val results = partitionOfRecords.map(......)
>>>
>>> DBConnection.commit()
>>>
>>>
>>> })
>>>
>>>
>>>
>>> Best regards,
>>>
>>> Ahmed Atef Nawwar
>>>
>>> Data Management & Big Data Consultant
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>
>>>> Normally spark retries take care of that.
>>>>
>>>> Is there something going on with your kafka installation, that
>>>> rebalance is taking especially long?
>>>>
>>>> Yes, increasing backoff / max number of retries will "help", but it's
>>>> better to figure out what's going on with kafka.
>>>>
>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> My streaming application gets killed with below error
>>>>>
>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>> [testtopic,193]))
>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>>>> for time 1440626120000 ms
>>>>> org.apache.spark.SparkException:
>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>> Set([testtopic,115]))
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>> at
>>>>>
>>>>>
>>>>>
>>>>> Kafka params in job logs printed are :
>>>>>  value.serializer = class
>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>         key.serializer = class
>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>         block.on.buffer.full = true
>>>>>         retry.backoff.ms = 100
>>>>>         buffer.memory = 1048576
>>>>>         batch.size = 16384
>>>>>         metrics.sample.window.ms = 30000
>>>>>         metadata.max.age.ms = 300000
>>>>>         receive.buffer.bytes = 32768
>>>>>         timeout.ms = 30000
>>>>>         max.in.flight.requests.per.connection = 5
>>>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>>>         metric.reporters = []
>>>>>         client.id =
>>>>>         compression.type = none
>>>>>         retries = 0
>>>>>         max.request.size = 1048576
>>>>>         send.buffer.bytes = 131072
>>>>>         acks = all
>>>>>         reconnect.backoff.ms = 10
>>>>>         linger.ms = 0
>>>>>         metrics.num.samples = 2
>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>
>>>>>
>>>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>>>> best way to handle it ?
>>>>> Increasing retries and backoff time  wil help and to what values those
>>>>> should be set to never have streaming application failure - rather it keep
>>>>> on retrying after few seconds and send a event so that my custom code can
>>>>> send notification of kafka broker down if its because of that.
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>
>>>
>>
>