You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Anil <an...@gmail.com> on 2017/02/04 09:01:01 UTC

EntryProcessor for cache

HI,

I have two caches Person and PersonDetail. i have to update the one of the
property person details status based on person and person detail properties
and current date.

Person {

String personId

String equivalentId

String name

Long dateOfBirth;
....

}

PersonDetail {

String detailedId

String equivalentId

Long startDate

Long endDate;

String status

}

Person cache key -> AffinityKey<String, String> -> AffinityKey<PersonId,
equivalentId>
PersonDetail cache key -> AffinityKey<String, String> ->
AffinityKey<DetailId, equivalentId>


status of Person details is determined based on Person#dateOfBirth,
PersonDetail#startDate, PersonDetail#endDate and current date.


I see entry processor can nbe applied on given set of cache keys only. Is
there any way in update each PersonDetail's status in efficient way ?

Can we use mapreduce ? if yes, Could you please share any example ?

Thanks for your help.

Thanks.

Re: EntryProcessor for cache

Posted by Andrey Mashenkov <an...@gmail.com>.
Hi Anil,

ignite.affinity("PERSON_CACHE").partitions() return total number of
partitions for given cache.
You may want to iterate over local partitions: ignite.affinity("PERSON_
CACHE").primaryPartitions(ignite.cluster().localNode())

On Fri, Feb 17, 2017 at 11:47 AM, Anil <an...@gmail.com> wrote:

> Hi Andrey,
>
> Thanks. this looks promising and will try that.
>
> the only way to get the partitions is ignite.affinity("PERSON_CACHE").partitions().
> is that holds for non affinity cache ?
>
> Thanks
>
> On 17 February 2017 at 10:39, Andrey Mashenkov <andrey.mashenkov@gmail.com
> > wrote:
>
>> Hi Anil,
>>
>> Most likely, your query takes long time due to SQL query is running in
>> single thread. The only workaround for now is to add more nodes.
>>
>> However, query is quite simple, so you can run ScanQuery per partition in
>> parallel manner for iterating over PERSON_CACHE.
>>
>>
>> On Fri, Feb 17, 2017 at 5:29 AM, Anil <an...@gmail.com> wrote:
>>
>>> Hi Andrey,
>>>
>>> Yes. index is available on eqId of PersonDetail object.
>>>
>>> Query says scan for Person cache not the PersonDetail cache.
>>>
>>> and i think the  above  Computask executed by only one thread and  not
>>> by number of threads on number of partitions. Can parallelism achieved here
>>> ?
>>>
>>> Thanks.
>>>
>>>
>>>
>>> On 17 February 2017 at 02:32, Andrey Mashenkov <
>>> andrey.mashenkov@gmail.com> wrote:
>>>
>>>> Hi Anil,
>>>>
>>>> 1. Seems, some node enter to topology, but cannot finish partition map
>>>> exchange operations due to long running transtaction or smth holds lock on
>>>> a partition.
>>>>
>>>> 2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used
>>>> for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId
>>>> field?
>>>>
>>>> On Thu, Feb 16, 2017 at 6:50 PM, Anil <an...@gmail.com> wrote:
>>>>
>>>>> Hi Val,
>>>>>
>>>>> I have created ComputeTask which updates which scans the local cache
>>>>> and updates its information to child records in another cache. Both caches
>>>>> are collocated so that parent and child records fall under node and
>>>>> partition.
>>>>>
>>>>> 1. I see following warning in the logs when compute task is running -
>>>>>
>>>>>  GridCachePartitionExchangeManager:480 - Failed to wait for partition
>>>>> map exchange [topVer=AffinityTopologyVersion [topVer=6,
>>>>> minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping
>>>>> pending objects that might be the cause:
>>>>>
>>>>> Should I worry about this warning ? what could be the reason for this
>>>>> warning.
>>>>>
>>>>> 2.
>>>>>
>>>>> QueryCursor<Entry<String, Person>> cursor = cache.query(new
>>>>> SqlQuery<String, Person>(Person.class, "select * from Person").
>>>>> *setLocal(**true**)*);
>>>>>
>>>>>
>>>>>
>>>>>   for (Entry<String, Person> row : cursor) {
>>>>>
>>>>>        String eqId =   row.getValue().getEqId(); //(String) row.get(0);
>>>>>
>>>>>        QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor
>>>>> =
>>>>>
>>>>>
>>>>>  detailsCache.query(new SqlQuery<AffinityKey<String>,
>>>>> PersonDetail>(PersonDetail.class,
>>>>>
>>>>>
>>>>>                                                                   "select *
>>>>> from DETAIL_CACHE.PersonDetail  where eqId = ?").*setLocal(true)*
>>>>> .setArgs(eqId));
>>>>>
>>>>>          for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {
>>>>>
>>>>>                // add person info to person detail and add to person
>>>>> detail data streamer.
>>>>>
>>>>>             }
>>>>>
>>>>>
>>>>>      }
>>>>>
>>>>>
>>>>> I see (in logs) that query is taking long time -
>>>>>
>>>>>
>>>>> Query execution is too long [time=23309 ms, sql='SELECT
>>>>> "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=
>>>>>
>>>>> SELECT
>>>>>
>>>>>     PERSON_CACHE.PERSON._KEY,
>>>>>
>>>>>     PERSON_CACHE.PERSON._VAL
>>>>>
>>>>> FROM PERSON_CACHE.PERSON
>>>>>
>>>>>     /* PERSON_CACHE.PERSON.__SCAN_ */
>>>>>
>>>>> , parameters=[]]
>>>>>
>>>>> any issues with the above approach ? thanks.
>>>>>
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>> On 11 February 2017 at 04:18, vkulichenko <
>>>>> valentin.kulichenko@gmail.com> wrote:
>>>>>
>>>>>> Looks ok except that the first query should also be local I guess.
>>>>>> Also note
>>>>>> that you used split adapter, so didn't actually map the jobs to nodes,
>>>>>> leaving this to Ignite. This means that there is a chance some nodes
>>>>>> will
>>>>>> get more than one job, and some none of the jobs. Round robin
>>>>>> balancing is
>>>>>> used by default, so this should not happen, at least on stable
>>>>>> topology, but
>>>>>> theoretically there is no guarantee. Use map method instead to
>>>>>> manually map
>>>>>> jobs to nodes, or just use broadcast() method.
>>>>>>
>>>>>> Jobs are executed in parallel in the public thread pool.
>>>>>>
>>>>>> -Val
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-ignite-users.705
>>>>>> 18.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
>>>>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Andrey V. Mashenkov
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards,
>> Andrey V. Mashenkov
>>
>
>


-- 
Best regards,
Andrey V. Mashenkov

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Hi Andrey,

Thanks. this looks promising and will try that.

the only way to get the partitions
is ignite.affinity("PERSON_CACHE").partitions(). is that holds for non
affinity cache ?

Thanks

On 17 February 2017 at 10:39, Andrey Mashenkov <an...@gmail.com>
wrote:

> Hi Anil,
>
> Most likely, your query takes long time due to SQL query is running in
> single thread. The only workaround for now is to add more nodes.
>
> However, query is quite simple, so you can run ScanQuery per partition in
> parallel manner for iterating over PERSON_CACHE.
>
>
> On Fri, Feb 17, 2017 at 5:29 AM, Anil <an...@gmail.com> wrote:
>
>> Hi Andrey,
>>
>> Yes. index is available on eqId of PersonDetail object.
>>
>> Query says scan for Person cache not the PersonDetail cache.
>>
>> and i think the  above  Computask executed by only one thread and  not by
>> number of threads on number of partitions. Can parallelism achieved here ?
>>
>> Thanks.
>>
>>
>>
>> On 17 February 2017 at 02:32, Andrey Mashenkov <
>> andrey.mashenkov@gmail.com> wrote:
>>
>>> Hi Anil,
>>>
>>> 1. Seems, some node enter to topology, but cannot finish partition map
>>> exchange operations due to long running transtaction or smth holds lock on
>>> a partition.
>>>
>>> 2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used
>>> for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId
>>> field?
>>>
>>> On Thu, Feb 16, 2017 at 6:50 PM, Anil <an...@gmail.com> wrote:
>>>
>>>> Hi Val,
>>>>
>>>> I have created ComputeTask which updates which scans the local cache
>>>> and updates its information to child records in another cache. Both caches
>>>> are collocated so that parent and child records fall under node and
>>>> partition.
>>>>
>>>> 1. I see following warning in the logs when compute task is running -
>>>>
>>>>  GridCachePartitionExchangeManager:480 - Failed to wait for partition
>>>> map exchange [topVer=AffinityTopologyVersion [topVer=6,
>>>> minorTopVer=0], node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping
>>>> pending objects that might be the cause:
>>>>
>>>> Should I worry about this warning ? what could be the reason for this
>>>> warning.
>>>>
>>>> 2.
>>>>
>>>> QueryCursor<Entry<String, Person>> cursor = cache.query(new
>>>> SqlQuery<String, Person>(Person.class, "select * from Person").
>>>> *setLocal(**true**)*);
>>>>
>>>>
>>>>
>>>>   for (Entry<String, Person> row : cursor) {
>>>>
>>>>        String eqId =   row.getValue().getEqId(); //(String) row.get(0);
>>>>
>>>>        QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor =
>>>>
>>>>                                                  detailsCache.query(new
>>>> SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,
>>>>
>>>>
>>>>
>>>>                     "select * from DETAIL_CACHE.PersonDetail  where eqId =
>>>> ?").*setLocal(true)*.setArgs(eqId));
>>>>
>>>>          for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {
>>>>
>>>>                // add person info to person detail and add to person
>>>> detail data streamer.
>>>>
>>>>             }
>>>>
>>>>
>>>>      }
>>>>
>>>>
>>>> I see (in logs) that query is taking long time -
>>>>
>>>>
>>>> Query execution is too long [time=23309 ms, sql='SELECT
>>>> "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=
>>>>
>>>> SELECT
>>>>
>>>>     PERSON_CACHE.PERSON._KEY,
>>>>
>>>>     PERSON_CACHE.PERSON._VAL
>>>>
>>>> FROM PERSON_CACHE.PERSON
>>>>
>>>>     /* PERSON_CACHE.PERSON.__SCAN_ */
>>>>
>>>> , parameters=[]]
>>>>
>>>> any issues with the above approach ? thanks.
>>>>
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> On 11 February 2017 at 04:18, vkulichenko <
>>>> valentin.kulichenko@gmail.com> wrote:
>>>>
>>>>> Looks ok except that the first query should also be local I guess.
>>>>> Also note
>>>>> that you used split adapter, so didn't actually map the jobs to nodes,
>>>>> leaving this to Ignite. This means that there is a chance some nodes
>>>>> will
>>>>> get more than one job, and some none of the jobs. Round robin
>>>>> balancing is
>>>>> used by default, so this should not happen, at least on stable
>>>>> topology, but
>>>>> theoretically there is no guarantee. Use map method instead to
>>>>> manually map
>>>>> jobs to nodes, or just use broadcast() method.
>>>>>
>>>>> Jobs are executed in parallel in the public thread pool.
>>>>>
>>>>> -Val
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-ignite-users.705
>>>>> 18.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
>>>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Andrey V. Mashenkov
>>>
>>
>>
>
>
> --
> Best regards,
> Andrey V. Mashenkov
>

Re: EntryProcessor for cache

Posted by Andrey Mashenkov <an...@gmail.com>.
Hi Anil,

Most likely, your query takes long time due to SQL query is running in
single thread. The only workaround for now is to add more nodes.

However, query is quite simple, so you can run ScanQuery per partition in
parallel manner for iterating over PERSON_CACHE.


On Fri, Feb 17, 2017 at 5:29 AM, Anil <an...@gmail.com> wrote:

> Hi Andrey,
>
> Yes. index is available on eqId of PersonDetail object.
>
> Query says scan for Person cache not the PersonDetail cache.
>
> and i think the  above  Computask executed by only one thread and  not by
> number of threads on number of partitions. Can parallelism achieved here ?
>
> Thanks.
>
>
>
> On 17 February 2017 at 02:32, Andrey Mashenkov <andrey.mashenkov@gmail.com
> > wrote:
>
>> Hi Anil,
>>
>> 1. Seems, some node enter to topology, but cannot finish partition map
>> exchange operations due to long running transtaction or smth holds lock on
>> a partition.
>>
>> 2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used
>> for this query and sull scan will be performed.Do you have an index on PersonDetail.eqId
>> field?
>>
>> On Thu, Feb 16, 2017 at 6:50 PM, Anil <an...@gmail.com> wrote:
>>
>>> Hi Val,
>>>
>>> I have created ComputeTask which updates which scans the local cache and
>>> updates its information to child records in another cache. Both caches are
>>> collocated so that parent and child records fall under node and partition.
>>>
>>> 1. I see following warning in the logs when compute task is running -
>>>
>>>  GridCachePartitionExchangeManager:480 - Failed to wait for partition
>>> map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0],
>>> node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects
>>> that might be the cause:
>>>
>>> Should I worry about this warning ? what could be the reason for this
>>> warning.
>>>
>>> 2.
>>>
>>> QueryCursor<Entry<String, Person>> cursor = cache.query(new
>>> SqlQuery<String, Person>(Person.class, "select * from Person").
>>> *setLocal(**true**)*);
>>>
>>>
>>>
>>>   for (Entry<String, Person> row : cursor) {
>>>
>>>        String eqId =   row.getValue().getEqId(); //(String) row.get(0);
>>>
>>>        QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor =
>>>
>>>                                                  detailsCache.query(new
>>> SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,
>>>
>>>
>>>
>>>                   "select * from DETAIL_CACHE.PersonDetail  where eqId =
>>> ?").*setLocal(true)*.setArgs(eqId));
>>>
>>>          for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {
>>>
>>>                // add person info to person detail and add to person
>>> detail data streamer.
>>>
>>>             }
>>>
>>>
>>>      }
>>>
>>>
>>> I see (in logs) that query is taking long time -
>>>
>>>
>>> Query execution is too long [time=23309 ms, sql='SELECT
>>> "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=
>>>
>>> SELECT
>>>
>>>     PERSON_CACHE.PERSON._KEY,
>>>
>>>     PERSON_CACHE.PERSON._VAL
>>>
>>> FROM PERSON_CACHE.PERSON
>>>
>>>     /* PERSON_CACHE.PERSON.__SCAN_ */
>>>
>>> , parameters=[]]
>>>
>>> any issues with the above approach ? thanks.
>>>
>>>
>>> Thanks.
>>>
>>>
>>> On 11 February 2017 at 04:18, vkulichenko <valentin.kulichenko@gmail.com
>>> > wrote:
>>>
>>>> Looks ok except that the first query should also be local I guess. Also
>>>> note
>>>> that you used split adapter, so didn't actually map the jobs to nodes,
>>>> leaving this to Ignite. This means that there is a chance some nodes
>>>> will
>>>> get more than one job, and some none of the jobs. Round robin balancing
>>>> is
>>>> used by default, so this should not happen, at least on stable
>>>> topology, but
>>>> theoretically there is no guarantee. Use map method instead to manually
>>>> map
>>>> jobs to nodes, or just use broadcast() method.
>>>>
>>>> Jobs are executed in parallel in the public thread pool.
>>>>
>>>> -Val
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-ignite-users.705
>>>> 18.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
>>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards,
>> Andrey V. Mashenkov
>>
>
>


-- 
Best regards,
Andrey V. Mashenkov

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Hi Andrey,

Yes. index is available on eqId of PersonDetail object.

Query says scan for Person cache not the PersonDetail cache.

and i think the  above  Computask executed by only one thread and  not by
number of threads on number of partitions. Can parallelism achieved here ?

Thanks.



On 17 February 2017 at 02:32, Andrey Mashenkov <an...@gmail.com>
wrote:

> Hi Anil,
>
> 1. Seems, some node enter to topology, but cannot finish partition map
> exchange operations due to long running transtaction or smth holds lock on
> a partition.
>
> 2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for
> this query and sull scan will be performed.Do you have an index on PersonDetail.eqId
> field?
>
> On Thu, Feb 16, 2017 at 6:50 PM, Anil <an...@gmail.com> wrote:
>
>> Hi Val,
>>
>> I have created ComputeTask which updates which scans the local cache and
>> updates its information to child records in another cache. Both caches are
>> collocated so that parent and child records fall under node and partition.
>>
>> 1. I see following warning in the logs when compute task is running -
>>
>>  GridCachePartitionExchangeManager:480 - Failed to wait for partition
>> map exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0],
>> node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that
>> might be the cause:
>>
>> Should I worry about this warning ? what could be the reason for this
>> warning.
>>
>> 2.
>>
>> QueryCursor<Entry<String, Person>> cursor = cache.query(new
>> SqlQuery<String, Person>(Person.class, "select * from Person").
>> *setLocal(**true**)*);
>>
>>
>>
>>   for (Entry<String, Person> row : cursor) {
>>
>>        String eqId =   row.getValue().getEqId(); //(String) row.get(0);
>>
>>        QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor =
>>
>>                                                  detailsCache.query(new
>> SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,
>>
>>
>>
>>                   "select * from DETAIL_CACHE.PersonDetail  where eqId =
>> ?").*setLocal(true)*.setArgs(eqId));
>>
>>          for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {
>>
>>                // add person info to person detail and add to person
>> detail data streamer.
>>
>>             }
>>
>>
>>      }
>>
>>
>> I see (in logs) that query is taking long time -
>>
>>
>> Query execution is too long [time=23309 ms, sql='SELECT
>> "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=
>>
>> SELECT
>>
>>     PERSON_CACHE.PERSON._KEY,
>>
>>     PERSON_CACHE.PERSON._VAL
>>
>> FROM PERSON_CACHE.PERSON
>>
>>     /* PERSON_CACHE.PERSON.__SCAN_ */
>>
>> , parameters=[]]
>>
>> any issues with the above approach ? thanks.
>>
>>
>> Thanks.
>>
>>
>> On 11 February 2017 at 04:18, vkulichenko <va...@gmail.com>
>> wrote:
>>
>>> Looks ok except that the first query should also be local I guess. Also
>>> note
>>> that you used split adapter, so didn't actually map the jobs to nodes,
>>> leaving this to Ignite. This means that there is a chance some nodes will
>>> get more than one job, and some none of the jobs. Round robin balancing
>>> is
>>> used by default, so this should not happen, at least on stable topology,
>>> but
>>> theoretically there is no guarantee. Use map method instead to manually
>>> map
>>> jobs to nodes, or just use broadcast() method.
>>>
>>> Jobs are executed in parallel in the public thread pool.
>>>
>>> -Val
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-ignite-users.705
>>> 18.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>>
>>
>>
>
>
> --
> Best regards,
> Andrey V. Mashenkov
>

Re: EntryProcessor for cache

Posted by Andrey Mashenkov <an...@gmail.com>.
Hi Anil,

1. Seems, some node enter to topology, but cannot finish partition map
exchange operations due to long running transtaction or smth holds lock on
a partition.

2.     /* PERSON_CACHE.PERSON.__SCAN_ */ says that no indices is used for
this query and sull scan will be performed.Do you have an index on
PersonDetail.eqId
field?

On Thu, Feb 16, 2017 at 6:50 PM, Anil <an...@gmail.com> wrote:

> Hi Val,
>
> I have created ComputeTask which updates which scans the local cache and
> updates its information to child records in another cache. Both caches are
> collocated so that parent and child records fall under node and partition.
>
> 1. I see following warning in the logs when compute task is running -
>
>  GridCachePartitionExchangeManager:480 - Failed to wait for partition map
> exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0],
> node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that
> might be the cause:
>
> Should I worry about this warning ? what could be the reason for this
> warning.
>
> 2.
>
> QueryCursor<Entry<String, Person>> cursor = cache.query(new
> SqlQuery<String, Person>(Person.class, "select * from Person").*setLocal(*
> *true**)*);
>
>
>
>   for (Entry<String, Person> row : cursor) {
>
>        String eqId =   row.getValue().getEqId(); //(String) row.get(0);
>
>        QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor =
>
>                                                  detailsCache.query(new
> SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,
>
>
>
>                 "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").
> *setLocal(true)*.setArgs(eqId));
>
>          for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {
>
>                // add person info to person detail and add to person
> detail data streamer.
>
>             }
>
>
>      }
>
>
> I see (in logs) that query is taking long time -
>
>
> Query execution is too long [time=23309 ms, sql='SELECT
> "PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=
>
> SELECT
>
>     PERSON_CACHE.PERSON._KEY,
>
>     PERSON_CACHE.PERSON._VAL
>
> FROM PERSON_CACHE.PERSON
>
>     /* PERSON_CACHE.PERSON.__SCAN_ */
>
> , parameters=[]]
>
> any issues with the above approach ? thanks.
>
>
> Thanks.
>
>
> On 11 February 2017 at 04:18, vkulichenko <va...@gmail.com>
> wrote:
>
>> Looks ok except that the first query should also be local I guess. Also
>> note
>> that you used split adapter, so didn't actually map the jobs to nodes,
>> leaving this to Ignite. This means that there is a chance some nodes will
>> get more than one job, and some none of the jobs. Round robin balancing is
>> used by default, so this should not happen, at least on stable topology,
>> but
>> theoretically there is no guarantee. Use map method instead to manually
>> map
>> jobs to nodes, or just use broadcast() method.
>>
>> Jobs are executed in parallel in the public thread pool.
>>
>> -Val
>>
>>
>>
>> --
>> View this message in context: http://apache-ignite-users.705
>> 18.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>
>
>


-- 
Best regards,
Andrey V. Mashenkov

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Hi Val,

I have created ComputeTask which updates which scans the local cache and
updates its information to child records in another cache. Both caches are
collocated so that parent and child records fall under node and partition.

1. I see following warning in the logs when compute task is running -

 GridCachePartitionExchangeManager:480 - Failed to wait for partition map
exchange [topVer=AffinityTopologyVersion [topVer=6, minorTopVer=0],
node=c7a3957b-a3d0-4923-8e5d-e95430c7e66e]. Dumping pending objects that
might be the cause:

Should I worry about this warning ? what could be the reason for this
warning.

2.

QueryCursor<Entry<String, Person>> cursor = cache.query(new
SqlQuery<String, Person>(Person.class, "select * from Person").*setLocal(*
*true**)*);



  for (Entry<String, Person> row : cursor) {

       String eqId =   row.getValue().getEqId(); //(String) row.get(0);

       QueryCursor<Entry<AffinityKey<String>, PersonDetail>> dCursor =

                                                 detailsCache.query(new
SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class,



              "select * from DETAIL_CACHE.PersonDetail  where eqId = ?").
*setLocal(true)*.setArgs(eqId));

         for (Entry<AffinityKey<String>, PersonDetail> d : dCursor) {

               // add person info to person detail and add to person detail
data streamer.

            }


     }


I see (in logs) that query is taking long time -


Query execution is too long [time=23309 ms, sql='SELECT
"PERSON_CACHE".Person._key, "PERSON_CACHE".PERSON._val from Person', plan=

SELECT

    PERSON_CACHE.PERSON._KEY,

    PERSON_CACHE.PERSON._VAL

FROM PERSON_CACHE.PERSON

    /* PERSON_CACHE.PERSON.__SCAN_ */

, parameters=[]]

any issues with the above approach ? thanks.


Thanks.


On 11 February 2017 at 04:18, vkulichenko <va...@gmail.com>
wrote:

> Looks ok except that the first query should also be local I guess. Also
> note
> that you used split adapter, so didn't actually map the jobs to nodes,
> leaving this to Ignite. This means that there is a chance some nodes will
> get more than one job, and some none of the jobs. Round robin balancing is
> used by default, so this should not happen, at least on stable topology,
> but
> theoretically there is no guarantee. Use map method instead to manually map
> jobs to nodes, or just use broadcast() method.
>
> Jobs are executed in parallel in the public thread pool.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: EntryProcessor for cache

Posted by vkulichenko <va...@gmail.com>.
Looks ok except that the first query should also be local I guess. Also note
that you used split adapter, so didn't actually map the jobs to nodes,
leaving this to Ignite. This means that there is a chance some nodes will
get more than one job, and some none of the jobs. Round robin balancing is
used by default, so this should not happen, at least on stable topology, but
theoretically there is no guarantee. Use map method instead to manually map
jobs to nodes, or just use broadcast() method.

Jobs are executed in parallel in the public thread pool.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10559.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
and Does compute task execute in parallel on number of partitions per node
? (like entry processor) thanks.

On 10 February 2017 at 10:52, Anil <an...@gmail.com> wrote:

> Hi Val,
>
> i have attached the code. please let me know if you see any issues with
> approach. thanks.
>
> Thanks.
>
> On 10 February 2017 at 02:16, vkulichenko <va...@gmail.com>
> wrote:
>
>> Anil,
>>
>> What exactly did you try and what didn't work? Can you show your code?
>>
>> -Val
>>
>>
>>
>> --
>> View this message in context: http://apache-ignite-users.705
>> 18.x6.nabble.com/EntryProcessor-for-cache-tp10432p10532.html
>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>
>
>

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Hi Val,

i have attached the code. please let me know if you see any issues with
approach. thanks.

Thanks.

On 10 February 2017 at 02:16, vkulichenko <va...@gmail.com>
wrote:

> Anil,
>
> What exactly did you try and what didn't work? Can you show your code?
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10532.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: EntryProcessor for cache

Posted by vkulichenko <va...@gmail.com>.
Anil,

What exactly did you try and what didn't work? Can you show your code?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10532.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Hi Val,

i tried to write compute task and no luck.

Can some create sudo code to update a cache using compute task ? thanks

Thanks

On 8 February 2017 at 03:03, vkulichenko <va...@gmail.com>
wrote:

> Anil,
>
> I don't think there is such an example in particular. Just implement
> ComputeTask, access cache(s) in compute jobs and work with local data. Then
> reduce in reduce() method to get final result. Other particular details
> depend on your use case.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10490.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: EntryProcessor for cache

Posted by vkulichenko <va...@gmail.com>.
Anil,

I don't think there is such an example in particular. Just implement
ComputeTask, access cache(s) in compute jobs and work with local data. Then
reduce in reduce() method to get final result. Other particular details
depend on your use case.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10490.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Hi Val,

Thanks. Could you please point me to mapreduce example that runs on cache ?

thanks.

On 7 February 2017 at 05:47, vkulichenko <va...@gmail.com>
wrote:

> Yes, you can create a map reduce task, execute local query within each job
> (use Query.setLocal(true)), and update queried entries accordingly.
>
> Also note that each SQL table in Ignite has predefined _key field that
> returns key object, so you can return set of keys from the query.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10462.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: EntryProcessor for cache

Posted by vkulichenko <va...@gmail.com>.
Yes, you can create a map reduce task, execute local query within each job
(use Query.setLocal(true)), and update queried entries accordingly.

Also note that each SQL table in Ignite has predefined _key field that
returns key object, so you can return set of keys from the query.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/EntryProcessor-for-cache-tp10432p10462.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: EntryProcessor for cache

Posted by Anil <an...@gmail.com>.
Anyone got a chance to look into this ? thanks.

On 4 February 2017 at 14:31, Anil <an...@gmail.com> wrote:

> HI,
>
> I have two caches Person and PersonDetail. i have to update the one of the
> property person details status based on person and person detail properties
> and current date.
>
> Person {
>
> String personId
>
> String equivalentId
>
> String name
>
> Long dateOfBirth;
> ....
>
> }
>
> PersonDetail {
>
> String detailedId
>
> String equivalentId
>
> Long startDate
>
> Long endDate;
>
> String status
>
> }
>
> Person cache key -> AffinityKey<String, String> -> AffinityKey<PersonId,
> equivalentId>
> PersonDetail cache key -> AffinityKey<String, String> ->
> AffinityKey<DetailId, equivalentId>
>
>
> status of Person details is determined based on Person#dateOfBirth,
> PersonDetail#startDate, PersonDetail#endDate and current date.
>
>
> I see entry processor can nbe applied on given set of cache keys only. Is
> there any way in update each PersonDetail's status in efficient way ?
>
> Can we use mapreduce ? if yes, Could you please share any example ?
>
> Thanks for your help.
>
> Thanks.
>