You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by pragmaticbigdata <am...@gmail.com> on 2016/10/27 14:56:36 UTC

Apache Spark & Ignite Integration

I am trying out the integration of Ignite with Spark and I have a few
questions related to how we integrate them and the advantages that we could
get from the integration.

1. How can I convert the IgniteRDD (fetched using the IgniteContext) to a
Spark dataset?
2. Once converted into Spark dataset, how can I update the underlying ignite
cache after I am done with spark processing? I understand that Ignite
provides the savePairs api for updating IgniteRDD, what would its
counterpart be for spark datasets?
3. One of the advantages of Ignite over spark is that it supports SQL
indexes. Would spark dataset take the advantage of these indexes when
queries are executed on it? Do I need to do anything explicit for that other
than enabling the indexes while defining IgniteConfiguration?

Thanks!



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
Thanks for sharing the jira ticket. 

Do you have inputs on the additional questions I asked about shared RDD
implementation? They aren't related to the dataframe/dataset support.
Looking forward for your thoughts.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p8984.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by Denis Magda <dm...@gridgain.com>.
Hi,

Here is the ticket
https://issues.apache.org/jira/browse/IGNITE-3084 <https://issues.apache.org/jira/browse/IGNITE-3084>

Feel free to paste your questions there as well so that the implementer takes them into account.

—
Denis

> On Nov 14, 2016, at 6:14 AM, pragmaticbigdata <am...@gmail.com> wrote:
> 
> Ok. Is there a jira task that I can track for the dataframes and datasets
> support?
> 
> I do have a couple of follow up questions to understand the memory
> representation of the shared RDD support that ignite brings with the spark
> integration. 
> 
> 1. Could you detail on how are shared RDD's implemented when ignite is
> deployed in a standalone mode? Assuming we have a ignite cluster consisting
> a cached named "partitioned" would creating a IgniteRDD through val
> sharedRDD: IgniteRDD[Int,Int] = ic.fromCache("partitioned")  create another
> copy of the cache on the spark executor jvm or would the spark executor
> operate on the original copy of the cache that is present on the ignite
> nodes? I am more interested in understanding the performance impact of data
> shuffling or movement if there is any.
> 
> 2. Since spark does not have transaction support, how I can use the ACID
> transaction support that Ignite provides when updating RDD's? A code example
> would be helpful if possible.
> 
> Thanks.
> 
> 
> 
> --
> View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p8951.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.


Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
First of all, I'm not sure how it works with Dataframes. Since we don't have
Dataframe support yet, only RDD, using Dataframe can potentially not work as
we expect (I don't have enough Spark expertise to tell if this is the case
or not). The only way to check this is to create tests.

Other than that, just keep in mind that IgniteRDD is basically another API
for Ignite cache. I.e. everything that is true for caches are true here. In
particular, answering your questions:

a. It depends what you use as keys. If a pair saved with the same key twice,
the second write will overwrite the first one.
b. Ignite works in concurrent fashion without locking the world. While you
save new data, you can still read it and run queries.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9544.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
Sure.

1. The first diagram is for understanding the data visibility aspect of the
spark integration. Given that a cache exists on the ignite node, spark tries
to create a data frame from the IgniteRDD and perform an action (df.show())
on it. Concurrently if there are changes made to the cache (either by
another spark application or by another application using Ignite API) on the
ignite node, the question is would spark worker be able to see those
changes? My understanding based on our discussion so far is that the
df.show() action would not display the latest changes in the cache since the
underlying IgniteRDD might be updated but the dataframe is another layer
about it.

2. The second diagram is to understand the locking and the concurrency
behavior with the spark integration. Given that a cache exists on the ignite
node, spark tries to create a data frame from the IgniteRDD and add a new
column to the data (in the diagram, the email column). Concurrently if there
are changes made to the cache (either by another spark application or by
another application using Ignite API) on the ignite node, the question is 
a. What happens when spark tries to persist the RDD back to the ignite cache
through the saveRDD() api? Would the changes made previously to the ignite
cache be lost? 
b. What is the locking behavior when updating the ignite cache? Would it
lock all the partitions  of the cache preventing read/write access to the
cache or can ignite determine the partitions that are going to be updated
and lock only those?

Thanks.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9502.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
Hi,

To be honest, I don't quite understand these diagrams :) Can you give some
comments?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9494.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
I have tried translating my understanding in these two images. Kindly let me
know if the diagrams depict the ignite-spark integration in terms of data
visibility and persistence correctly.

<http://apache-ignite-users.70518.x6.nabble.com/file/n9393/Pbq53oL.png> 

<http://apache-ignite-users.70518.x6.nabble.com/file/n9393/q7e83SI.png> 



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9393.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
This is true, and this is the nature of RDD in my view, which is mostly
intended to be read-only. As for embedded mode, it's more for testing in my
understanding. In this mode server nodes will be started on executors which
I guess will be stopped when the application is stopped. If so, you lose the
whole purpose of the shared RDD.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9137.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
Thanks for the follow up.


> Data will be in sync because it's stored in Ignite cache. IgniteRDD uses
> Ignite API to update it and you can do this as well in your code. 
> 
> There is no copy of the data maintained in Spark, it's always stored in
> Ignite caches. Spark runs Ignite client(s) that can fetch the data for
> computation, but it doesn't store it. 

I think I missed on clarifying what I wanted to say in my earlier comment.
When I earlier said that "I will have to discard the spark
rdd/dataset/dataframe every time the data is updated in ignite through the
Ignite API" what I also meant was I could not cache the dataset in spark's
memory for future transformations (using dataset.cache() spark api) because
if the ignite cache gets updated simultaneously by another user, my dataset
in spark would be stale. This happens because spark acts as an ignite client
and fetches the data instead of a tight integration where in it (spark)
could have worked with the same copy of data on the ignite server. 

If what I have understood is true I wanted to confirm that behavior is no
different when ignite runs in embedded mode with spark. Kindly let me know.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9121.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
pragmaticbigdata wrote
> Yes but the data would not be in sync when both(updates and analytics) are
> done concurrently, right? I will have to discard the spark
> rdd/dataset/dataframe every time the data is updated in ignite through the
> Ignite API. As I understand the data remains in sync only when we use the
> IgniteRDD api. Correct me if my understanding is wrong.

Data will be in sync because it's stored in Ignite cache. IgniteRDD uses
Ignite API to update it and you can do this as well in your code.

pragmaticbigdata wrote
> I have an additional question on the same topic - Even when ignite runs in
> an embedded mode with spark, the memory footprint behavior is the same as
> it is when ignite runs in standalone mode, right? i.e When spark  fetches
> the ignite cache through the IgniteRDD api (val igniteRDD =
> igniteContext.fromCache("
> <cache-name>
> ") a copy of data is created in the spark worker's memory.

There is no copy of the data maintained in Spark, it's always stored in
Ignite caches. Spark runs Ignite client(s) that can fetch the data for
computation, but it doesn't store it.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9114.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
> use Ignite to update the data in transactional manner and Spark for
> analytics. 

Yes but the data would not be in sync when both(updates and analytics) are
done concurrently, right? I will have to discard the spark
rdd/dataset/dataframe every time the data is updated in ignite through the
Ignite API. As I understand the data remains in sync only when we use the
IgniteRDD api. Correct me if my understanding is wrong.

I have an additional question on the same topic - Even when ignite runs in
an embedded mode with spark, the memory footprint behavior is the same as it
is when ignite runs in standalone mode, right? i.e When spark  fetches the
ignite cache through the IgniteRDD api (val igniteRDD =
igniteContext.fromCache("<cache-name>") a copy of data is created in the
spark worker's memory.

Thanks.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9108.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
IgniteRDD can be used with any dataset as long as it fits in memory. If you
didn't get performance from pure Ignite, I doubt that Spark will help. It's
Ignite who speeds up Spark, not other way around :) I believe there are
other reasons for not having acceptable performance.

As for the APIs, you can always combine - use Ignite to update the data in
transactional manner and Spark for analytics.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9093.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
a. No. I have not executed any tests. I am doing a theoretical study first to
understand the memory footprint and data movement between the spark & ignite
nodes

c. So basically there is no use case when working with spark (for data
processing) and ignite (for in-memory data storage) that can benefit from
ignite transactions. Since spark is non-transactional, I am trying to
understand how can use the ACID transactional support feature of ignite when
performing data processing in spark. Let me know if you think otherwise.


> To be honest, I'm not quite sure you should use IgniteRDD because it
> sounds like your use case is bigger than that (indexed SQL, transactions,
> etc.). Did you consider using pure Ignite API without integrating it with
> Spark? 

Yes, I have performed tests with pure Ignite API but the performance didn't
turn out to be well. Additionally we do have complex requirements in the
analytics space that could be served well with Spark. Are you suggesting
that IgniteRDD (in other words - spark integration) should be used when the
data set is small? From various blogs/articles I understand that the main
benefit of using spark with Ignite is to easily share data across spark jobs
where ignite is used for in-memory data storage and spark for its high speed
data processing capabilities.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9082.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
a. Actually, I'm looking at the code and in doubts now. Did you make any
experiments? What do they show?
b. Correct. sql() method goes directly to Ignite bypassing Spark and this is
basically the same as calling IgniteCache.query().
c. savePairs() is a distributed operation on a Spark cluster, so this will
not work. To use transactions you will have to use Ignite API.

To be honest, I'm not quite sure you should use IgniteRDD because it sounds
like your use case is bigger than that (indexed SQL, transactions, etc.).
Did you consider using pure Ignite API without integrating it with Spark?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9062.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
Appreciate your follow ups.

a. " Data is stored in Ignite and Spark will fetch data for a particular
partition when you execute something." Does IgniteRDD (i.e. Spark) fetch the
data to the closest Spark node that probably resides on the same server? One
of the earlier responses mention that this is done when new entries are
added to the cache.

a1. Could you please detail on how #a is achieved? I looked at the
IgniteRDD.compute() method implementation which creates a ScanQuery and
makes a call to the affinity api but I didn't follow how does the code
search for the closest ignite node?

b. For igniteRDD.sql() query execution, it seems that the behavior and hence
the performance would be same as executing a sql query on the IgniteCache
from an ignite client node. Is my understanding right? I follow the fact
that the performance would be better when compared to a similar spark SQL
query because of the in-memory indexes.

c. How can I take the advantage of Ignite's ACID transaction support when
doing the data processing in spark? Based on one of the earlier points the
code flow would look like
         val sharedRDD1: IgniteRDD[Int,Int] = ic.fromCache("partitioned")
         val sharedRDD2: IgniteRDD[Int,Int] = ic.fromCache("anotherCache")

         Transaction tx = Ignition.ignite().transactions().txStart()
         sharedRDD1.savePairs(...);
         sharedRDD2.savePairs(...);
         tx.commit()

Is my understanding of the flow correct? If so, how do I maintain
transaction isolation when other spark jobs try to read the data from ignite
in parallel?

Thanks.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9047.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
a. Data is stored in Ignite and Spark will fetch data for a particular
partition when you execute something. It will be done lazily page by page
while iteration is happening. Specifically, IgniteRDD overrides compute()
method where it gets a scan query iterator over a partition.
b. When you execute the query using IgniteRDD.sql() method, you go directly
to Ignite through its SQL engine, completely bypassing Spark. So everything
that is supported in Ignite is available here. User defined functions are
also available [1].

[1]
https://ignite.apache.org/releases/mobile/org/apache/ignite/cache/query/annotations/QuerySqlFunction.html

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9027.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
Ok. 
a. From your comments I understand that there is only one copy of the data
which resides on the ignite cluster. The data is not copied on the spark
nodes while executing the lineage graph consisting of transformations &
actions. If my understanding is correct what happens when a transformation
is applied on an RDD? Does it create a new cache or just an RDD?
b. One of the features of IgniteRDD is to speed up Spark SQL queries by 100
times. This is being done by using the in-memory indexing capabilities that
ignite provides. Since the IgniteRDD is created from the IgniteContext, I
assume we could only execute sql queries (through igniteRDD.sql() api) that
Ignite could execute and not any Spark SQL query. Is my understanding right?
for e.g. can we define user defined functions as we do with Spark SQL? 

Thanks.




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9019.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
Hi,

Here are my responses:

1. In standalone mode each executor will run an embedded client node to
access the cluster. If you use saveValues() method to load the data,
IgniteRDD will generate keys in the way that values will be saved on the
closest server (i.e. running on the same box with the executor). This should
minimize data movement.

2. For this you will have to use Ignite API directly. You can use
IgniteContext.ignite() to get the local client on the driver.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p9006.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by pragmaticbigdata <am...@gmail.com>.
Ok. Is there a jira task that I can track for the dataframes and datasets
support?

I do have a couple of follow up questions to understand the memory
representation of the shared RDD support that ignite brings with the spark
integration. 

1. Could you detail on how are shared RDD's implemented when ignite is
deployed in a standalone mode? Assuming we have a ignite cluster consisting
a cached named "partitioned" would creating a IgniteRDD through val
sharedRDD: IgniteRDD[Int,Int] = ic.fromCache("partitioned")  create another
copy of the cache on the spark executor jvm or would the spark executor
operate on the original copy of the cache that is present on the ignite
nodes? I am more interested in understanding the performance impact of data
shuffling or movement if there is any.

2. Since spark does not have transaction support, how I can use the ACID
transaction support that Ignite provides when updating RDD's? A code example
would be helpful if possible.

Thanks.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p8951.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Apache Spark & Ignite Integration

Posted by vkulichenko <va...@gmail.com>.
Hi,

There is no direct support for data frames or data sets right now. This is
planned for the future though.

For now you can convert IgniteRDD to data set and any other RDD (IgniteRD is
an extension of RDD), but you will lose all the advantages. Both savePairs
and sql methods are available only on IgniteRDD.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Apache-Spark-Ignite-Integration-tp8556p8568.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.