You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by varun sharma <va...@gmail.com> on 2015/10/19 14:48:07 UTC

Issue in spark batches

Hi,
I am facing this issue consistently in spark-cassandra-kafka *streaming
job.*
*Spark 1.4.0*
*cassandra connector 1.4.0-M3*
*Issue is:*

I am reading data from *Kafka* using DirectStream, writing to *Cassandra* after
parsing the json and the subsequently updating the offsets in *zookeeper*.
If Cassandra cluster is down, it throws exception but the batch which
arrives in that time window is not processed ever though the offsets are
updated in zookeeper.
It is resulting data loss.
Once the Cassandra cluster is up, this job process the data normally.
PFA the screenshots of hung batches and code.

*Code:*

data_rdd.foreachRDD(rdd=> {
  val stream = rdd
    .map(x =>JsonUtility.deserialize(x))
  stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE,
StreamModel.getColumns)


  //commit the offsets once everything is done
  ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
})

*I have even tried this variant:*

data_rdd.foreachRDD(rdd=> {
  val stream = rdd
    .map(x =>JsonUtility.deserialize(x))
  stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE,
StreamModel.getColumns)
})

data_rdd.foreachRDD(rdd=> {

  //commit the offsets once everything is done

  ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)

}

Exception when cassandra cluster is down:
[2015-10-19 12:49:20] [JobScheduler] [ERROR]
[org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
streaming job 1445239140000 ms.3
java.io.IOException: Failed to open native connection to Cassandra at
{......}

-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Re: Issue in spark batches

Posted by Tathagata Das <td...@databricks.com>.
Unfortunately, you will have to write that code yourself.

TD

On Tue, Oct 20, 2015 at 11:28 PM, varun sharma <va...@gmail.com>
wrote:

> Hi TD,
> Is there any way in spark  I can fail/retry batch in case of any
> exceptions or do I have to write code to explicitly keep on retrying?
> Also If some batch fail, I want to block further batches to be processed
> as it would create inconsistency in updation of zookeeper offsets and maybe
> kill the job itself after lets say 3 retries.
>
> Any pointers to achieve same are appreciated.
>
> On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
>> actually completing with exception, the UI does not update correctly.
>>
>> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <va...@gmail.com>
>> wrote:
>>
>>> Also, As you can see the timestamps in attached image. batches coming
>>> after the Cassandra server comes up(21:04) are processed and batches which
>>> are in hung state(21:03) never get processed.
>>> So, How do I fail those batches so that those can be processed again.
>>>
>>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharmansit@gmail.com
>>> > wrote:
>>>
>>>> Hi TD,
>>>> Yes saveToCassandra throws exception. How do I fail that task
>>>> explicitly if i catch any exceptions?.
>>>> Right now that batch doesn't fail and remain in hung state. Is there
>>>> any way I fail that batch so that it can be tried again.
>>>>
>>>> Thanks
>>>> Varun
>>>>
>>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>>>> does, you can catch that exception and write your own logic to retry and/or
>>>>> no update. Once the foreachRDD function completes, that batch will be
>>>>> internally marked as completed.
>>>>>
>>>>> TD
>>>>>
>>>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
>>>>> varunsharmansit@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>>>> job.*
>>>>>> *Spark 1.4.0*
>>>>>> *cassandra connector 1.4.0-M3*
>>>>>> *Issue is:*
>>>>>>
>>>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>>>> offsets in *zookeeper*.
>>>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>>>> arrives in that time window is not processed ever though the offsets are
>>>>>> updated in zookeeper.
>>>>>> It is resulting data loss.
>>>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>>>> PFA the screenshots of hung batches and code.
>>>>>>
>>>>>> *Code:*
>>>>>>
>>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>>   val stream = rdd
>>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>>>
>>>>>>
>>>>>>   //commit the offsets once everything is done
>>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>> })
>>>>>>
>>>>>> *I have even tried this variant:*
>>>>>>
>>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>>   val stream = rdd
>>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>>> })
>>>>>>
>>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>>
>>>>>>   //commit the offsets once everything is done
>>>>>>
>>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>>
>>>>>> }
>>>>>>
>>>>>> Exception when cassandra cluster is down:
>>>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>>>> streaming job 1445239140000 ms.3
>>>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>>>> {......}
>>>>>>
>>>>>> --
>>>>>> *VARUN SHARMA*
>>>>>> *Flipkart*
>>>>>> *Bangalore*
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Re: Issue in spark batches

Posted by varun sharma <va...@gmail.com>.
Hi TD,
Is there any way in spark  I can fail/retry batch in case of any exceptions
or do I have to write code to explicitly keep on retrying?
Also If some batch fail, I want to block further batches to be processed as
it would create inconsistency in updation of zookeeper offsets and maybe
kill the job itself after lets say 3 retries.

Any pointers to achieve same are appreciated.

On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <td...@databricks.com> wrote:

> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
> actually completing with exception, the UI does not update correctly.
>
> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <va...@gmail.com>
> wrote:
>
>> Also, As you can see the timestamps in attached image. batches coming
>> after the Cassandra server comes up(21:04) are processed and batches which
>> are in hung state(21:03) never get processed.
>> So, How do I fail those batches so that those can be processed again.
>>
>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <va...@gmail.com>
>> wrote:
>>
>>> Hi TD,
>>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>>> if i catch any exceptions?.
>>> Right now that batch doesn't fail and remain in hung state. Is there any
>>> way I fail that batch so that it can be tried again.
>>>
>>> Thanks
>>> Varun
>>>
>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>>> does, you can catch that exception and write your own logic to retry and/or
>>>> no update. Once the foreachRDD function completes, that batch will be
>>>> internally marked as completed.
>>>>
>>>> TD
>>>>
>>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
>>>> varunsharmansit@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>>> job.*
>>>>> *Spark 1.4.0*
>>>>> *cassandra connector 1.4.0-M3*
>>>>> *Issue is:*
>>>>>
>>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>>> offsets in *zookeeper*.
>>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>>> arrives in that time window is not processed ever though the offsets are
>>>>> updated in zookeeper.
>>>>> It is resulting data loss.
>>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>>> PFA the screenshots of hung batches and code.
>>>>>
>>>>> *Code:*
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>   val stream = rdd
>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>>
>>>>>
>>>>>   //commit the offsets once everything is done
>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>> })
>>>>>
>>>>> *I have even tried this variant:*
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>   val stream = rdd
>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>> })
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>
>>>>>   //commit the offsets once everything is done
>>>>>
>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>
>>>>> }
>>>>>
>>>>> Exception when cassandra cluster is down:
>>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>>> streaming job 1445239140000 ms.3
>>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>>> {......}
>>>>>
>>>>> --
>>>>> *VARUN SHARMA*
>>>>> *Flipkart*
>>>>> *Bangalore*
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Re: Issue in spark batches

Posted by Tathagata Das <td...@databricks.com>.
That is actually a bug in the UI that got fixed in 1.5.1. The batch is
actually completing with exception, the UI does not update correctly.

On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <va...@gmail.com>
wrote:

> Also, As you can see the timestamps in attached image. batches coming
> after the Cassandra server comes up(21:04) are processed and batches which
> are in hung state(21:03) never get processed.
> So, How do I fail those batches so that those can be processed again.
>
> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <va...@gmail.com>
> wrote:
>
>> Hi TD,
>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>> if i catch any exceptions?.
>> Right now that batch doesn't fail and remain in hung state. Is there any
>> way I fail that batch so that it can be tried again.
>>
>> Thanks
>> Varun
>>
>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>> does, you can catch that exception and write your own logic to retry and/or
>>> no update. Once the foreachRDD function completes, that batch will be
>>> internally marked as completed.
>>>
>>> TD
>>>
>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharmansit@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>> job.*
>>>> *Spark 1.4.0*
>>>> *cassandra connector 1.4.0-M3*
>>>> *Issue is:*
>>>>
>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>> offsets in *zookeeper*.
>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>> arrives in that time window is not processed ever though the offsets are
>>>> updated in zookeeper.
>>>> It is resulting data loss.
>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>> PFA the screenshots of hung batches and code.
>>>>
>>>> *Code:*
>>>>
>>>> data_rdd.foreachRDD(rdd=> {
>>>>   val stream = rdd
>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>
>>>>
>>>>   //commit the offsets once everything is done
>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>> })
>>>>
>>>> *I have even tried this variant:*
>>>>
>>>> data_rdd.foreachRDD(rdd=> {
>>>>   val stream = rdd
>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>> })
>>>>
>>>> data_rdd.foreachRDD(rdd=> {
>>>>
>>>>   //commit the offsets once everything is done
>>>>
>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>
>>>> }
>>>>
>>>> Exception when cassandra cluster is down:
>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>> streaming job 1445239140000 ms.3
>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>> {......}
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Re: Issue in spark batches

Posted by varun sharma <va...@gmail.com>.
Also, As you can see the timestamps in attached image. batches coming after
the Cassandra server comes up(21:04) are processed and batches which are in
hung state(21:03) never get processed.
So, How do I fail those batches so that those can be processed again.

On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <va...@gmail.com>
wrote:

> Hi TD,
> Yes saveToCassandra throws exception. How do I fail that task explicitly
> if i catch any exceptions?.
> Right now that batch doesn't fail and remain in hung state. Is there any
> way I fail that batch so that it can be tried again.
>
> Thanks
> Varun
>
> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> If cassandra is down, does saveToCassandra throw an exception? If it
>> does, you can catch that exception and write your own logic to retry and/or
>> no update. Once the foreachRDD function completes, that batch will be
>> internally marked as completed.
>>
>> TD
>>
>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <va...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>> job.*
>>> *Spark 1.4.0*
>>> *cassandra connector 1.4.0-M3*
>>> *Issue is:*
>>>
>>> I am reading data from *Kafka* using DirectStream, writing to
>>> *Cassandra* after parsing the json and the subsequently updating the
>>> offsets in *zookeeper*.
>>> If Cassandra cluster is down, it throws exception but the batch which
>>> arrives in that time window is not processed ever though the offsets are
>>> updated in zookeeper.
>>> It is resulting data loss.
>>> Once the Cassandra cluster is up, this job process the data normally.
>>> PFA the screenshots of hung batches and code.
>>>
>>> *Code:*
>>>
>>> data_rdd.foreachRDD(rdd=> {
>>>   val stream = rdd
>>>     .map(x =>JsonUtility.deserialize(x))
>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>
>>>
>>>   //commit the offsets once everything is done
>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>> })
>>>
>>> *I have even tried this variant:*
>>>
>>> data_rdd.foreachRDD(rdd=> {
>>>   val stream = rdd
>>>     .map(x =>JsonUtility.deserialize(x))
>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>> })
>>>
>>> data_rdd.foreachRDD(rdd=> {
>>>
>>>   //commit the offsets once everything is done
>>>
>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>
>>> }
>>>
>>> Exception when cassandra cluster is down:
>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>> streaming job 1445239140000 ms.3
>>> java.io.IOException: Failed to open native connection to Cassandra at
>>> {......}
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>



-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Re: Issue in spark batches

Posted by varun sharma <va...@gmail.com>.
Hi TD,
Yes saveToCassandra throws exception. How do I fail that task explicitly if
i catch any exceptions?.
Right now that batch doesn't fail and remain in hung state. Is there any
way I fail that batch so that it can be tried again.

Thanks
Varun

On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <td...@databricks.com> wrote:

> If cassandra is down, does saveToCassandra throw an exception? If it does,
> you can catch that exception and write your own logic to retry and/or no
> update. Once the foreachRDD function completes, that batch will be
> internally marked as completed.
>
> TD
>
> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <va...@gmail.com>
> wrote:
>
>> Hi,
>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>> job.*
>> *Spark 1.4.0*
>> *cassandra connector 1.4.0-M3*
>> *Issue is:*
>>
>> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* after
>> parsing the json and the subsequently updating the offsets in *zookeeper*
>> .
>> If Cassandra cluster is down, it throws exception but the batch which
>> arrives in that time window is not processed ever though the offsets are
>> updated in zookeeper.
>> It is resulting data loss.
>> Once the Cassandra cluster is up, this job process the data normally.
>> PFA the screenshots of hung batches and code.
>>
>> *Code:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>>     .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>
>>
>>   //commit the offsets once everything is done
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>> })
>>
>> *I have even tried this variant:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>>     .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>> })
>>
>> data_rdd.foreachRDD(rdd=> {
>>
>>   //commit the offsets once everything is done
>>
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>
>> }
>>
>> Exception when cassandra cluster is down:
>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>> streaming job 1445239140000 ms.3
>> java.io.IOException: Failed to open native connection to Cassandra at
>> {......}
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Re: Issue in spark batches

Posted by Tathagata Das <td...@databricks.com>.
If cassandra is down, does saveToCassandra throw an exception? If it does,
you can catch that exception and write your own logic to retry and/or no
update. Once the foreachRDD function completes, that batch will be
internally marked as completed.

TD

On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <va...@gmail.com>
wrote:

> Hi,
> I am facing this issue consistently in spark-cassandra-kafka *streaming
> job.*
> *Spark 1.4.0*
> *cassandra connector 1.4.0-M3*
> *Issue is:*
>
> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* after
> parsing the json and the subsequently updating the offsets in *zookeeper*.
> If Cassandra cluster is down, it throws exception but the batch which
> arrives in that time window is not processed ever though the offsets are
> updated in zookeeper.
> It is resulting data loss.
> Once the Cassandra cluster is up, this job process the data normally.
> PFA the screenshots of hung batches and code.
>
> *Code:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
>     .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>
>
>   //commit the offsets once everything is done
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
> })
>
> *I have even tried this variant:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
>     .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
> })
>
> data_rdd.foreachRDD(rdd=> {
>
>   //commit the offsets once everything is done
>
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>
> }
>
> Exception when cassandra cluster is down:
> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
> streaming job 1445239140000 ms.3
> java.io.IOException: Failed to open native connection to Cassandra at
> {......}
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>