You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by camer314 <ca...@towerswatson.com> on 2019/11/18 01:49:17 UTC

How to perform distributed compute in similar way to Spark vector UDF

I asked this question on  StackOverflow
<https://stackoverflow.com/questions/58759138/apache-ignite-analogue-of-spark-vector-udf-and-distributed-compute-in-general/58766331#58766331>  

However I probably put too much weight on Spark.

My question really is, how can I load in a large CSV file to the cache and
send compute actions to the nodes which work in a similar way to Pandas UDF.
That is, they work on a subset of the data (rows).

In Ignite I imagine I could load the CSV to a cache using PARTITION mode and
then using affinity compute send functions to the nodes where the data is,
so each node is processing only the data that exists on it. This seems like
a nice way to go, each node is always only processing locally, and the
results of those actions would be adding back to the cache, so presumably
would only add locally as well.

However, I am not entirely sure how the partitioning works. The examples for
affinity show using a single key value.

Is there a way to load a CSV into a cache in PARTITION mode, so Ignite
evenly distributes across the grid but then run a compute job on every node
that works ONLY with the data in its own cache, that way i wont need to care
about keys?

For example, imagine a CSV file that is a matrix of numbers. My distributed
cache would really be a dataframe representation of that file. For arguments
sake lets say my cache is keyed by an increment ID with the data being an
array of doubles and the column names are A,B,C

That ID key is really pretty irrelevant. Its is meaningless to my
application.

Now lets say I wanted to perform the same maths on every row in that
dataframe, with the results being a new column in the cache.

If that formula was D = A * B * C then D becomes a new column.

Ignoring Spark SQL, in Spark I could write a UDF easily that creates column
D by passing columns [A,B,C]. Spark doesnt care about keys or ID columns in
this instance, it just gives you a vector of data and you return a vector of
results.

So in Ignite, how can i replicate that behaviour the most elegantly in code
(.NET), send compute to the grid that collectively processes all rows
without caring about the keys?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to perform distributed compute in similar way to Spark vector UDF

Posted by Denis Magda <dm...@apache.org>.
If you need to traverse over the local data on all the nodes then broadcast
a compute task to all of them and use ScanQuery with setLocal flag set to
true.

Also, you can load balance the load by going for a similar approach with an
affinity call per partition:
https://www.gridgain.com/docs/latest/developers-guide/collocated-computations#collocating-by-partition

The benefit of the affinity-based methods of the compute api is that a
partition will be locked and won't be evicted until the computation is
finished. The partition can be evicted if a cluster topology has changed,
the partition was rebalanced to another node and now needs to be removed
from the node the compute is running on.

-
Denis


On Sun, Nov 17, 2019 at 7:43 PM camer314 <ca...@towerswatson.com>
wrote:

> Reading a little more in the Java docs about AffinityKey, I am thinking
> that,
> much like vector UDF batch sizing, one way I could easily achieve my result
> is to batch my rows into affinity keys. That is, for every 100,000 rows the
> affinity key changes for example.
>
> So cache keys [0...99999] have affinity key 0, keys [100000...199999] have
> affinity key 1 etc?
>
> If that is the case, may I suggest you update the .NET documentation for
> Data Grid regarding Affinity Colocation as it does not mention the use of
> AffinityKey or go into anywhere near as much detail as the Java docs.
>
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

RE: Re: How to perform distributed compute in similar way to Spark vector UDF

Posted by Alexandr Shapkin <le...@gmail.com>.
Hello!



Have you tried the ML package for Apache Ignite [1]?

It definitely should use data frames internally.



> So cache keys [0...99999] have affinity key 0, keys [100000...199999] have

>affinity key 1 etc?



Sure, the easiest one would be to add a new field to your model with

[AffinityKeyMapped] public string AffKey { get; set; } = rowNumber /
itemsPerPartition



Also, you may refer to GridGain’s documentation as well [2].

It’s compatible with Apache Ignite.



[1] - <https://apacheignite.readme.io/docs/ml-partition-based-dataset>

[2] - [https://www.gridgain.com/docs/latest/developers-guide/data-
modeling/affinity-collocation#configuring-affinity-
key](https://www.gridgain.com/docs/latest/developers-guide/data-
modeling/affinity-collocation#configuring-affinity-key)



 **From:**[camer314](mailto:cameron.murray@towerswatson.com)  
 **Sent:** Monday, November 18, 2019 6:43 AM  
 **To:**[user@ignite.apache.org](mailto:user@ignite.apache.org)  
 **Subject:** Re: How to perform distributed compute in similar way to Spark
vector UDF



Reading a little more in the Java docs about AffinityKey, I am thinking that,

much like vector UDF batch sizing, one way I could easily achieve my result

is to batch my rows into affinity keys. That is, for every 100,000 rows the

affinity key changes for example.



So cache keys [0...99999] have affinity key 0, keys [100000...199999] have

affinity key 1 etc?



If that is the case, may I suggest you update the .NET documentation for

Data Grid regarding Affinity Colocation as it does not mention the use of

AffinityKey or go into anywhere near as much detail as the Java docs.













\--

Sent from: http://apache-ignite-users.70518.x6.nabble.com/




Re: How to perform distributed compute in similar way to Spark vector UDF

Posted by camer314 <ca...@towerswatson.com>.
Reading a little more in the Java docs about AffinityKey, I am thinking that,
much like vector UDF batch sizing, one way I could easily achieve my result
is to batch my rows into affinity keys. That is, for every 100,000 rows the
affinity key changes for example.

So cache keys [0...99999] have affinity key 0, keys [100000...199999] have
affinity key 1 etc?

If that is the case, may I suggest you update the .NET documentation for
Data Grid regarding Affinity Colocation as it does not mention the use of
AffinityKey or go into anywhere near as much detail as the Java docs.






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/