You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@geode.apache.org by trung kien <ki...@gmail.com> on 2018/07/17 06:15:22 UTC

Spark streaming output to geode

Dear all,

I am looking for a correct way to write output of a spark streaming job to
a geode's region

I'm trying following codes:

results.foreachRDD(
                    rdd -> rdd.foreachPartition(
                            records ->
                            {
                                    //STEP 1
                                    ClientCache cache = new
ClientCacheFactory().addPoolLocator(locator, 10334)
                                    .set("log-level", "WARN").create();

                                    //STEP 2
                                    Region< String, Transaction > region =
cache.<String, Transaction
>createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");

                                    //STEP 3
                                    processing(region,records)

                                    //STEP 4
                                    cache.close();
                            }
                    )
                );

1/ If I'm using above code, then I will get following errors:
               org.apache.geode.cache.CacheClosedException: The cache is
closed.
               org.apache.geode.cache.RegionExistsException: /transactions

        I think Spark is trying to re-use the connection to geode, in
second batch when connection is already closed it raise exceptions

2/ I comment out the //STEP 4 (cache.close())
              The spark job running BUT THE DATA IS MISSING RANDOMLY

3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
              while(true) {
                   //STEP 1,
                   //STEP 2
                   //STEP 3
                   //STEP 4
              }

          The application is running well without exception or loosing data.


Any thoughts is really appriciated




-- 
Thanks
Kien

Re: Spark streaming output to geode

Posted by trung kien <ki...@gmail.com>.
Looks like the design of the connector will work for me.

I will give it a try.

On Tue, Jul 17, 2018 at 4:59 PM Jason Huynh <jh...@pivotal.io> wrote:

> Not sure if this helps any, there used to be a Geode Spark Connector
> module that was removed with ticket GEODE-194.  I believe someone has
> adopted/‘forked’ the code and maintaining it here:
> https://github.com/Pivotal-Field-Engineering/geode-spark-connector
>
>
> I think the connection with workers issue was addressed in the connector.
> Maybe looking at that code might help with what you are doing? (or even
> using the connector as is, although I am not sure what Spark version it is
> compatible with)
>
> On Tue, Jul 17, 2018 at 2:47 PM trung kien <ki...@gmail.com> wrote:
>
>> Ah, that make sense.
>> i will give it a try and let you know if it’s work.
>>
>> But my question on Client Cache Pool still stand.
>> Wherether the pool will be shared among workers.
>> Can worker #1 can close the cache of worker #2.
>>
>> On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <bb...@pivotal.io> wrote:
>>
>>> Hi Trung,
>>>
>>> One pattern that comes to mind is to create a `CacheProvider` object
>>> that lazily creates the cache and region objects:
>>>
>>> ```scala
>>> object CacheProvider {
>>> lazy val cache = new ClientCacheFactory()...
>>> lazy val region = ....
>>> }
>>> ```
>>>
>>> Then update your `foreachPartition` function to use this object
>>>
>>> ```scala
>>> records -> {
>>>   val cache = CacheProvider.cache
>>>   val region = CacheProvider.region
>>>   ...
>>> ```
>>>
>>> This way the cache is not created until it is needed on the worker.
>>> You'll want to delay closing the cache until your Spark application
>>> ends; there are Spark event listeners you can register to cleanup
>>> resources. I have not tested this code myself, so it might need some
>>> tweaking.
>>>
>>> --Bradford
>>>
>>>
>>> On Tue, Jul 17, 2018 at 2:10 PM trung kien <ki...@gmail.com> wrote:
>>> >
>>> > Hi John,
>>> >
>>> > The problem is that outside of rdd.foreachPartition block, the code is
>>> not executed by single worker.
>>> > In Spark streaming, each parition will be handle by a worker in
>>> parallel.
>>> > So multiple workers will work on its own processing(region,records) at
>>> the same time.
>>> >
>>> > If I open connection outside of that block, it's not passed through
>>> workers.
>>> > The recommend way is creating connection inside that block
>>> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html
>>> (Design Patterns for using foreachRDD)
>>> >
>>> > dstream.foreachRDD { rdd =>
>>> >   rdd.foreachPartition { partitionOfRecords =>
>>> >     val connection = createNewConnection()
>>> >     partitionOfRecords.foreach(record => connection.send(record))
>>> >     connection.close()
>>> >   }
>>> > }
>>> >
>>> > What's I don't understand is we seems to create the connection
>>> locally, create then close in the same block.
>>> > Is there any chance that the connection will be re-used by other
>>> workers (on other machines)?
>>> > For example, I have 2 workers running in parallel all of them will
>>> create connection pool to locator at pretty the same time
>>> > Is the connection pool shared among all my workers? so the close on
>>> worker #1 will impact the connection on worker #2.
>>> >
>>> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <jb...@pivotal.io> wrote:
>>> >>
>>> >> Hi Trung-
>>> >>
>>> >> You definitely should not be opening and closing a connection each
>>> time you process a record; that is excessive.
>>> >>
>>> >> I can tell you right now that, between clientCache.close(), say, in
>>> the first iteration, and ClientCacheFactory.create() in the second
>>> iteration, it is unlikely that Geode completely "closed" the ClientCache
>>> instance (releasing all the resources) before the ClientCache instance is
>>> recreated in the subsequent iteration, hence the reason you hit a
>>> CacheClosedException or RegionExistsException.
>>> >>
>>> >> I have run into similar problems in a single test class, where I need
>>> to cycle the cache instance between test case methods in the same test
>>> class (suite).
>>> >>
>>> >> Having said that, it would be better to structure the code as...
>>> >>
>>> >> ClientCache cache = new ClientCacheFactory()
>>> >>     .addPoolLocator(locator, 10334)
>>> >>     .set("log-level", "WARN")
>>> >>     .create();
>>> >>
>>> >> Region< String, Transaction > region =
>>> >>     cache.<String, Transaction
>>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>> >>
>>> >> try {
>>> >>
>>> >>     results.foreachRDD(
>>> >>         rdd -> rdd.foreachPartition(
>>> >>             records -> {
>>> >>                 processing(region,records)
>>> >>             }
>>> >>         )
>>> >>     );
>>> >> }
>>> >> finally {
>>> >>     cache.close();
>>> >> }
>>> >>
>>> >> Why do you need to open/close the connection for each record/batch
>>> anyhow?
>>> >>
>>> >> Regards,
>>> >> -John
>>> >>
>>> >>
>>> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <ki...@gmail.com>
>>> wrote:
>>> >>>
>>> >>> Dear all,
>>> >>>
>>> >>> I am looking for a correct way to write output of a spark streaming
>>> job to a geode's region
>>> >>>
>>> >>> I'm trying following codes:
>>> >>>
>>> >>> results.foreachRDD(
>>> >>>                     rdd -> rdd.foreachPartition(
>>> >>>                             records ->
>>> >>>                             {
>>> >>>                                     //STEP 1
>>> >>>                                     ClientCache cache = new
>>> ClientCacheFactory().addPoolLocator(locator, 10334)
>>> >>>                                     .set("log-level",
>>> "WARN").create();
>>> >>>
>>> >>>                                     //STEP 2
>>> >>>                                     Region< String, Transaction >
>>> region = cache.<String, Transaction
>>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>> >>>
>>> >>>                                     //STEP 3
>>> >>>                                     processing(region,records)
>>> >>>
>>> >>>                                     //STEP 4
>>> >>>                                     cache.close();
>>> >>>                             }
>>> >>>                     )
>>> >>>                 );
>>> >>>
>>> >>> 1/ If I'm using above code, then I will get following errors:
>>> >>>                org.apache.geode.cache.CacheClosedException: The
>>> cache is closed.
>>> >>>                org.apache.geode.cache.RegionExistsException:
>>> /transactions
>>> >>>
>>> >>>         I think Spark is trying to re-use the connection to geode,
>>> in second batch when connection is already closed it raise exceptions
>>> >>>
>>> >>> 2/ I comment out the //STEP 4 (cache.close())
>>> >>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>>> >>>
>>> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while
>>> loop
>>> >>>               while(true) {
>>> >>>                    //STEP 1,
>>> >>>                    //STEP 2
>>> >>>                    //STEP 3
>>> >>>                    //STEP 4
>>> >>>               }
>>> >>>
>>> >>>           The application is running well without exception or
>>> loosing data.
>>> >>>
>>> >>>
>>> >>> Any thoughts is really appriciated
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Thanks
>>> >>> Kien
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> -John
>>> >> john.blum10101 (skype)
>>> >
>>> > --
>>> > Thanks
>>> > Kien
>>>
>> --
>> Thanks
>> Kien
>>
> --
Thanks
Kien

Re: Spark streaming output to geode

Posted by Jason Huynh <jh...@pivotal.io>.
Not sure if this helps any, there used to be a Geode Spark Connector module
that was removed with ticket GEODE-194.  I believe someone has
adopted/‘forked’ the code and maintaining it here:
https://github.com/Pivotal-Field-Engineering/geode-spark-connector


I think the connection with workers issue was addressed in the
connector.  Maybe
looking at that code might help with what you are doing? (or even using the
connector as is, although I am not sure what Spark version it is compatible
with)

On Tue, Jul 17, 2018 at 2:47 PM trung kien <ki...@gmail.com> wrote:

> Ah, that make sense.
> i will give it a try and let you know if it’s work.
>
> But my question on Client Cache Pool still stand.
> Wherether the pool will be shared among workers.
> Can worker #1 can close the cache of worker #2.
>
> On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <bb...@pivotal.io> wrote:
>
>> Hi Trung,
>>
>> One pattern that comes to mind is to create a `CacheProvider` object
>> that lazily creates the cache and region objects:
>>
>> ```scala
>> object CacheProvider {
>> lazy val cache = new ClientCacheFactory()...
>> lazy val region = ....
>> }
>> ```
>>
>> Then update your `foreachPartition` function to use this object
>>
>> ```scala
>> records -> {
>>   val cache = CacheProvider.cache
>>   val region = CacheProvider.region
>>   ...
>> ```
>>
>> This way the cache is not created until it is needed on the worker.
>> You'll want to delay closing the cache until your Spark application
>> ends; there are Spark event listeners you can register to cleanup
>> resources. I have not tested this code myself, so it might need some
>> tweaking.
>>
>> --Bradford
>>
>>
>> On Tue, Jul 17, 2018 at 2:10 PM trung kien <ki...@gmail.com> wrote:
>> >
>> > Hi John,
>> >
>> > The problem is that outside of rdd.foreachPartition block, the code is
>> not executed by single worker.
>> > In Spark streaming, each parition will be handle by a worker in
>> parallel.
>> > So multiple workers will work on its own processing(region,records) at
>> the same time.
>> >
>> > If I open connection outside of that block, it's not passed through
>> workers.
>> > The recommend way is creating connection inside that block
>> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html
>> (Design Patterns for using foreachRDD)
>> >
>> > dstream.foreachRDD { rdd =>
>> >   rdd.foreachPartition { partitionOfRecords =>
>> >     val connection = createNewConnection()
>> >     partitionOfRecords.foreach(record => connection.send(record))
>> >     connection.close()
>> >   }
>> > }
>> >
>> > What's I don't understand is we seems to create the connection locally,
>> create then close in the same block.
>> > Is there any chance that the connection will be re-used by other
>> workers (on other machines)?
>> > For example, I have 2 workers running in parallel all of them will
>> create connection pool to locator at pretty the same time
>> > Is the connection pool shared among all my workers? so the close on
>> worker #1 will impact the connection on worker #2.
>> >
>> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <jb...@pivotal.io> wrote:
>> >>
>> >> Hi Trung-
>> >>
>> >> You definitely should not be opening and closing a connection each
>> time you process a record; that is excessive.
>> >>
>> >> I can tell you right now that, between clientCache.close(), say, in
>> the first iteration, and ClientCacheFactory.create() in the second
>> iteration, it is unlikely that Geode completely "closed" the ClientCache
>> instance (releasing all the resources) before the ClientCache instance is
>> recreated in the subsequent iteration, hence the reason you hit a
>> CacheClosedException or RegionExistsException.
>> >>
>> >> I have run into similar problems in a single test class, where I need
>> to cycle the cache instance between test case methods in the same test
>> class (suite).
>> >>
>> >> Having said that, it would be better to structure the code as...
>> >>
>> >> ClientCache cache = new ClientCacheFactory()
>> >>     .addPoolLocator(locator, 10334)
>> >>     .set("log-level", "WARN")
>> >>     .create();
>> >>
>> >> Region< String, Transaction > region =
>> >>     cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>> >>
>> >> try {
>> >>
>> >>     results.foreachRDD(
>> >>         rdd -> rdd.foreachPartition(
>> >>             records -> {
>> >>                 processing(region,records)
>> >>             }
>> >>         )
>> >>     );
>> >> }
>> >> finally {
>> >>     cache.close();
>> >> }
>> >>
>> >> Why do you need to open/close the connection for each record/batch
>> anyhow?
>> >>
>> >> Regards,
>> >> -John
>> >>
>> >>
>> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <ki...@gmail.com>
>> wrote:
>> >>>
>> >>> Dear all,
>> >>>
>> >>> I am looking for a correct way to write output of a spark streaming
>> job to a geode's region
>> >>>
>> >>> I'm trying following codes:
>> >>>
>> >>> results.foreachRDD(
>> >>>                     rdd -> rdd.foreachPartition(
>> >>>                             records ->
>> >>>                             {
>> >>>                                     //STEP 1
>> >>>                                     ClientCache cache = new
>> ClientCacheFactory().addPoolLocator(locator, 10334)
>> >>>                                     .set("log-level",
>> "WARN").create();
>> >>>
>> >>>                                     //STEP 2
>> >>>                                     Region< String, Transaction >
>> region = cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>> >>>
>> >>>                                     //STEP 3
>> >>>                                     processing(region,records)
>> >>>
>> >>>                                     //STEP 4
>> >>>                                     cache.close();
>> >>>                             }
>> >>>                     )
>> >>>                 );
>> >>>
>> >>> 1/ If I'm using above code, then I will get following errors:
>> >>>                org.apache.geode.cache.CacheClosedException: The cache
>> is closed.
>> >>>                org.apache.geode.cache.RegionExistsException:
>> /transactions
>> >>>
>> >>>         I think Spark is trying to re-use the connection to geode, in
>> second batch when connection is already closed it raise exceptions
>> >>>
>> >>> 2/ I comment out the //STEP 4 (cache.close())
>> >>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>> >>>
>> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while
>> loop
>> >>>               while(true) {
>> >>>                    //STEP 1,
>> >>>                    //STEP 2
>> >>>                    //STEP 3
>> >>>                    //STEP 4
>> >>>               }
>> >>>
>> >>>           The application is running well without exception or
>> loosing data.
>> >>>
>> >>>
>> >>> Any thoughts is really appriciated
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thanks
>> >>> Kien
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> -John
>> >> john.blum10101 (skype)
>> >
>> > --
>> > Thanks
>> > Kien
>>
> --
> Thanks
> Kien
>

Re: Spark streaming output to geode

Posted by trung kien <ki...@gmail.com>.
Ah, that make sense.
i will give it a try and let you know if it’s work.

But my question on Client Cache Pool still stand.
Wherether the pool will be shared among workers.
Can worker #1 can close the cache of worker #2.

On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <bb...@pivotal.io> wrote:

> Hi Trung,
>
> One pattern that comes to mind is to create a `CacheProvider` object
> that lazily creates the cache and region objects:
>
> ```scala
> object CacheProvider {
> lazy val cache = new ClientCacheFactory()...
> lazy val region = ....
> }
> ```
>
> Then update your `foreachPartition` function to use this object
>
> ```scala
> records -> {
>   val cache = CacheProvider.cache
>   val region = CacheProvider.region
>   ...
> ```
>
> This way the cache is not created until it is needed on the worker.
> You'll want to delay closing the cache until your Spark application
> ends; there are Spark event listeners you can register to cleanup
> resources. I have not tested this code myself, so it might need some
> tweaking.
>
> --Bradford
>
>
> On Tue, Jul 17, 2018 at 2:10 PM trung kien <ki...@gmail.com> wrote:
> >
> > Hi John,
> >
> > The problem is that outside of rdd.foreachPartition block, the code is
> not executed by single worker.
> > In Spark streaming, each parition will be handle by a worker in parallel.
> > So multiple workers will work on its own processing(region,records) at
> the same time.
> >
> > If I open connection outside of that block, it's not passed through
> workers.
> > The recommend way is creating connection inside that block
> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html
> (Design Patterns for using foreachRDD)
> >
> > dstream.foreachRDD { rdd =>
> >   rdd.foreachPartition { partitionOfRecords =>
> >     val connection = createNewConnection()
> >     partitionOfRecords.foreach(record => connection.send(record))
> >     connection.close()
> >   }
> > }
> >
> > What's I don't understand is we seems to create the connection locally,
> create then close in the same block.
> > Is there any chance that the connection will be re-used by other workers
> (on other machines)?
> > For example, I have 2 workers running in parallel all of them will
> create connection pool to locator at pretty the same time
> > Is the connection pool shared among all my workers? so the close on
> worker #1 will impact the connection on worker #2.
> >
> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <jb...@pivotal.io> wrote:
> >>
> >> Hi Trung-
> >>
> >> You definitely should not be opening and closing a connection each time
> you process a record; that is excessive.
> >>
> >> I can tell you right now that, between clientCache.close(), say, in the
> first iteration, and ClientCacheFactory.create() in the second iteration,
> it is unlikely that Geode completely "closed" the ClientCache instance
> (releasing all the resources) before the ClientCache instance is recreated
> in the subsequent iteration, hence the reason you hit a
> CacheClosedException or RegionExistsException.
> >>
> >> I have run into similar problems in a single test class, where I need
> to cycle the cache instance between test case methods in the same test
> class (suite).
> >>
> >> Having said that, it would be better to structure the code as...
> >>
> >> ClientCache cache = new ClientCacheFactory()
> >>     .addPoolLocator(locator, 10334)
> >>     .set("log-level", "WARN")
> >>     .create();
> >>
> >> Region< String, Transaction > region =
> >>     cache.<String, Transaction
> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
> >>
> >> try {
> >>
> >>     results.foreachRDD(
> >>         rdd -> rdd.foreachPartition(
> >>             records -> {
> >>                 processing(region,records)
> >>             }
> >>         )
> >>     );
> >> }
> >> finally {
> >>     cache.close();
> >> }
> >>
> >> Why do you need to open/close the connection for each record/batch
> anyhow?
> >>
> >> Regards,
> >> -John
> >>
> >>
> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <ki...@gmail.com>
> wrote:
> >>>
> >>> Dear all,
> >>>
> >>> I am looking for a correct way to write output of a spark streaming
> job to a geode's region
> >>>
> >>> I'm trying following codes:
> >>>
> >>> results.foreachRDD(
> >>>                     rdd -> rdd.foreachPartition(
> >>>                             records ->
> >>>                             {
> >>>                                     //STEP 1
> >>>                                     ClientCache cache = new
> ClientCacheFactory().addPoolLocator(locator, 10334)
> >>>                                     .set("log-level", "WARN").create();
> >>>
> >>>                                     //STEP 2
> >>>                                     Region< String, Transaction >
> region = cache.<String, Transaction
> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
> >>>
> >>>                                     //STEP 3
> >>>                                     processing(region,records)
> >>>
> >>>                                     //STEP 4
> >>>                                     cache.close();
> >>>                             }
> >>>                     )
> >>>                 );
> >>>
> >>> 1/ If I'm using above code, then I will get following errors:
> >>>                org.apache.geode.cache.CacheClosedException: The cache
> is closed.
> >>>                org.apache.geode.cache.RegionExistsException:
> /transactions
> >>>
> >>>         I think Spark is trying to re-use the connection to geode, in
> second batch when connection is already closed it raise exceptions
> >>>
> >>> 2/ I comment out the //STEP 4 (cache.close())
> >>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
> >>>
> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while
> loop
> >>>               while(true) {
> >>>                    //STEP 1,
> >>>                    //STEP 2
> >>>                    //STEP 3
> >>>                    //STEP 4
> >>>               }
> >>>
> >>>           The application is running well without exception or loosing
> data.
> >>>
> >>>
> >>> Any thoughts is really appriciated
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks
> >>> Kien
> >>
> >>
> >>
> >>
> >> --
> >> -John
> >> john.blum10101 (skype)
> >
> > --
> > Thanks
> > Kien
>
-- 
Thanks
Kien

Re: Spark streaming output to geode

Posted by Bradford Boyle <bb...@pivotal.io>.
Hi Trung,

One pattern that comes to mind is to create a `CacheProvider` object
that lazily creates the cache and region objects:

```scala
object CacheProvider {
lazy val cache = new ClientCacheFactory()...
lazy val region = ....
}
```

Then update your `foreachPartition` function to use this object

```scala
records -> {
  val cache = CacheProvider.cache
  val region = CacheProvider.region
  ...
```

This way the cache is not created until it is needed on the worker.
You'll want to delay closing the cache until your Spark application
ends; there are Spark event listeners you can register to cleanup
resources. I have not tested this code myself, so it might need some
tweaking.

--Bradford


On Tue, Jul 17, 2018 at 2:10 PM trung kien <ki...@gmail.com> wrote:
>
> Hi John,
>
> The problem is that outside of rdd.foreachPartition block, the code is not executed by single worker.
> In Spark streaming, each parition will be handle by a worker in parallel.
> So multiple workers will work on its own processing(region,records) at the same time.
>
> If I open connection outside of that block, it's not passed through workers.
> The recommend way is creating connection inside that block
> http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html (Design Patterns for using foreachRDD)
>
> dstream.foreachRDD { rdd =>
>   rdd.foreachPartition { partitionOfRecords =>
>     val connection = createNewConnection()
>     partitionOfRecords.foreach(record => connection.send(record))
>     connection.close()
>   }
> }
>
> What's I don't understand is we seems to create the connection locally, create then close in the same block.
> Is there any chance that the connection will be re-used by other workers (on other machines)?
> For example, I have 2 workers running in parallel all of them will create connection pool to locator at pretty the same time
> Is the connection pool shared among all my workers? so the close on worker #1 will impact the connection on worker #2.
>
> On Tue, Jul 17, 2018 at 11:55 AM John Blum <jb...@pivotal.io> wrote:
>>
>> Hi Trung-
>>
>> You definitely should not be opening and closing a connection each time you process a record; that is excessive.
>>
>> I can tell you right now that, between clientCache.close(), say, in the first iteration, and ClientCacheFactory.create() in the second iteration, it is unlikely that Geode completely "closed" the ClientCache instance (releasing all the resources) before the ClientCache instance is recreated in the subsequent iteration, hence the reason you hit a CacheClosedException or RegionExistsException.
>>
>> I have run into similar problems in a single test class, where I need to cycle the cache instance between test case methods in the same test class (suite).
>>
>> Having said that, it would be better to structure the code as...
>>
>> ClientCache cache = new ClientCacheFactory()
>>     .addPoolLocator(locator, 10334)
>>     .set("log-level", "WARN")
>>     .create();
>>
>> Region< String, Transaction > region =
>>     cache.<String, Transaction >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>
>> try {
>>
>>     results.foreachRDD(
>>         rdd -> rdd.foreachPartition(
>>             records -> {
>>                 processing(region,records)
>>             }
>>         )
>>     );
>> }
>> finally {
>>     cache.close();
>> }
>>
>> Why do you need to open/close the connection for each record/batch anyhow?
>>
>> Regards,
>> -John
>>
>>
>> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <ki...@gmail.com> wrote:
>>>
>>> Dear all,
>>>
>>> I am looking for a correct way to write output of a spark streaming job to a geode's region
>>>
>>> I'm trying following codes:
>>>
>>> results.foreachRDD(
>>>                     rdd -> rdd.foreachPartition(
>>>                             records ->
>>>                             {
>>>                                     //STEP 1
>>>                                     ClientCache cache = new ClientCacheFactory().addPoolLocator(locator, 10334)
>>>                                     .set("log-level", "WARN").create();
>>>
>>>                                     //STEP 2
>>>                                     Region< String, Transaction > region = cache.<String, Transaction >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>>
>>>                                     //STEP 3
>>>                                     processing(region,records)
>>>
>>>                                     //STEP 4
>>>                                     cache.close();
>>>                             }
>>>                     )
>>>                 );
>>>
>>> 1/ If I'm using above code, then I will get following errors:
>>>                org.apache.geode.cache.CacheClosedException: The cache is closed.
>>>                org.apache.geode.cache.RegionExistsException: /transactions
>>>
>>>         I think Spark is trying to re-use the connection to geode, in second batch when connection is already closed it raise exceptions
>>>
>>> 2/ I comment out the //STEP 4 (cache.close())
>>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>>>
>>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
>>>               while(true) {
>>>                    //STEP 1,
>>>                    //STEP 2
>>>                    //STEP 3
>>>                    //STEP 4
>>>               }
>>>
>>>           The application is running well without exception or loosing data.
>>>
>>>
>>> Any thoughts is really appriciated
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Kien
>>
>>
>>
>>
>> --
>> -John
>> john.blum10101 (skype)
>
> --
> Thanks
> Kien

Re: Spark streaming output to geode

Posted by trung kien <ki...@gmail.com>.
Hi John,

The problem is that outside of rdd.foreachPartition block, the code is not
executed by single worker.
In Spark streaming, each parition will be handle by a worker in parallel.
So multiple workers will work on its own processing(region,records) at the
same time.

If I open connection outside of that block, it's not passed through workers.
The recommend way is creating connection inside that block
http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html (Design
Patterns for using foreachRDD)

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

What's I don't understand is we seems to create the connection locally,
create then close in the same block.
Is there any chance that the connection will be re-used by other workers
(on other machines)?
For example, I have 2 workers running in parallel all of them will create
connection pool to locator at pretty the same time
Is the connection pool shared among all my workers? so the close on worker
#1 will impact the connection on worker #2.

On Tue, Jul 17, 2018 at 11:55 AM John Blum <jb...@pivotal.io> wrote:

> Hi Trung-
>
> You definitely should not be opening and closing a connection each time
> you process a record; that is excessive.
>
> I can tell you right now that, between clientCache.close(), say, in the
> first iteration, and ClientCacheFactory.create() in the second iteration,
> it is unlikely that Geode completely "closed" the ClientCache instance
> (releasing all the resources) before the ClientCache instance is
> recreated in the subsequent iteration, hence the reason you hit a
> CacheClosedException or RegionExistsException.
>
> I have run into similar problems in a single test class, where I need to
> cycle the cache instance between test case methods in the same test class
> (suite).
>
> Having said that, it would be better to structure the code as...
>
> ClientCache cache = new ClientCacheFactory()
>     .addPoolLocator(locator, 10334)
>     .set("log-level", "WARN")
>     .create();
>
> Region< String, Transaction > region =
>     cache.<String, Transaction
> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>
> try {
>
>     results.foreachRDD(
>         rdd -> rdd.foreachPartition(
>             records -> {
>                 processing(region,records)
>             }
>         )
>     );
> }
> finally {
>     cache.close();
> }
>
> Why do you need to open/close the connection for each record/batch anyhow?
>
> Regards,
> -John
>
>
> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <ki...@gmail.com> wrote:
>
>> Dear all,
>>
>> I am looking for a correct way to write output of a spark streaming job
>> to a geode's region
>>
>> I'm trying following codes:
>>
>> results.foreachRDD(
>>                     rdd -> rdd.foreachPartition(
>>                             records ->
>>                             {
>>                                     //STEP 1
>>                                     ClientCache cache = new
>> ClientCacheFactory().addPoolLocator(locator, 10334)
>>                                     .set("log-level", "WARN").create();
>>
>>                                     //STEP 2
>>                                     Region< String, Transaction > region
>> = cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>
>>                                     //STEP 3
>>                                     processing(region,records)
>>
>>                                     //STEP 4
>>                                     cache.close();
>>                             }
>>                     )
>>                 );
>>
>> 1/ If I'm using above code, then I will get following errors:
>>                org.apache.geode.cache.CacheClosedException: The cache is
>> closed.
>>                org.apache.geode.cache.RegionExistsException: /
>> transactions
>>
>>         I think Spark is trying to re-use the connection to geode, in
>> second batch when connection is already closed it raise exceptions
>>
>> 2/ I comment out the //STEP 4 (cache.close())
>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>>
>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
>>               while(true) {
>>                    //STEP 1,
>>                    //STEP 2
>>                    //STEP 3
>>                    //STEP 4
>>               }
>>
>>           The application is running well without exception or loosing
>> data.
>>
>>
>> Any thoughts is really appriciated
>>
>>
>>
>>
>> --
>> Thanks
>> Kien
>>
>
>
>
> --
> -John
> john.blum10101 (skype)
>
-- 
Thanks
Kien

Re: Spark streaming output to geode

Posted by John Blum <jb...@pivotal.io>.
Hi Trung-

You definitely should not be opening and closing a connection each time you
process a record; that is excessive.

I can tell you right now that, between clientCache.close(), say, in the
first iteration, and ClientCacheFactory.create() in the second iteration,
it is unlikely that Geode completely "closed" the ClientCache instance
(releasing all the resources) before the ClientCache instance is recreated
in the subsequent iteration, hence the reason you hit a CacheClosedException
or RegionExistsException.

I have run into similar problems in a single test class, where I need to
cycle the cache instance between test case methods in the same test class
(suite).

Having said that, it would be better to structure the code as...

ClientCache cache = new ClientCacheFactory()
    .addPoolLocator(locator, 10334)
    .set("log-level", "WARN")
    .create();

Region< String, Transaction > region =
    cache.<String, Transaction >createClientRegionFactory(
ClientRegionShortcut.PROXY).create("transactions");

try {

    results.foreachRDD(
        rdd -> rdd.foreachPartition(
            records -> {
                processing(region,records)
            }
        )
    );
}
finally {
    cache.close();
}

Why do you need to open/close the connection for each record/batch anyhow?

Regards,
-John


On Mon, Jul 16, 2018 at 11:15 PM, trung kien <ki...@gmail.com> wrote:

> Dear all,
>
> I am looking for a correct way to write output of a spark streaming job to
> a geode's region
>
> I'm trying following codes:
>
> results.foreachRDD(
>                     rdd -> rdd.foreachPartition(
>                             records ->
>                             {
>                                     //STEP 1
>                                     ClientCache cache = new
> ClientCacheFactory().addPoolLocator(locator, 10334)
>                                     .set("log-level", "WARN").create();
>
>                                     //STEP 2
>                                     Region< String, Transaction > region
> = cache.<String, Transaction >createClientRegionFactory(
> ClientRegionShortcut.PROXY).create("transactions");
>
>                                     //STEP 3
>                                     processing(region,records)
>
>                                     //STEP 4
>                                     cache.close();
>                             }
>                     )
>                 );
>
> 1/ If I'm using above code, then I will get following errors:
>                org.apache.geode.cache.CacheClosedException: The cache is
> closed.
>                org.apache.geode.cache.RegionExistsException: /transactions
>
>         I think Spark is trying to re-use the connection to geode, in
> second batch when connection is already closed it raise exceptions
>
> 2/ I comment out the //STEP 4 (cache.close())
>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>
> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
>               while(true) {
>                    //STEP 1,
>                    //STEP 2
>                    //STEP 3
>                    //STEP 4
>               }
>
>           The application is running well without exception or loosing
> data.
>
>
> Any thoughts is really appriciated
>
>
>
>
> --
> Thanks
> Kien
>



-- 
-John
john.blum10101 (skype)