You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by roshan joe <im...@gmail.com> on 2017/11/01 20:22:26 UTC

implement shared RDD in spark streaming in scala

I am very new to Ignite and went through the documentation and samples on
the web. Below is the use case I am hoping to resolve using shared spark
RDDs.

   - Spark Streaming App-1 writes incremental records, which does not
   already exist (by using a hash value generated inside the App) to a shared
   RDD.


   - Spark Streaming App-2 looks up the data in this shared RDD using key
   columns and gets additional column values from the shared RDD populated by
   the App-1 above.


   - Spark Streaming App-3, Spark Streaming App-4 etc do the same lookup
   against shared RDD, same as the above.

I am sure this use-case has been implemented before and am trying to avoid
spending time to build this from scratch. Is there some sample code to do
this using scala that can be shared? Thank you very much!

Re: implement shared RDD in spark streaming in scala

Posted by Denis Magda <dm...@apache.org>.
Hi,

> 1. Added config/default-config.xml directly under the project and added the below snippet. 

Make sure the config is located under IGNITE_HOME or provide an absolute path to it.

> 2. Commented the above snippet and use the below: 
> val ic = new IgniteContext(sparkContext, () => new IgniteConfiguration())
> val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("partitioned")
> sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))
> Now it complains that "it cannot find any IPs from multicast or IPFinder. 
> 
> Not sure what am i doing not right. I also have a cluster.properties file but I am not sure if it is needed for the yarn setup and I am not explicitly calling it anywhere. Thank you. 


Specify a port range for every IP address you list. Most likely the server nodes are not bound to 45700 port number (default one) and the Spark application unable to locate them in network.

This is how you set the port range:
https://github.com/apache/ignite/blob/master/examples/config/example-default.xml#L68

—
Denis

> On Nov 3, 2017, at 5:52 AM, roshan joe <im...@gmail.com> wrote:
> 
> Thanks Denis. Below are the steps I followed for ignite with spark on Yarn but I am stuck after spending all day on this. 
> 
> copied apache-ignite-fabric-2.1.0-bin to all the 4 slave nodes of Spark cluster and started ignite with ./bin/ignite.sh. It shows like below after starting 
> Topology snapshot [ver=10, servers=4, clients=0, CPUs=32, heap=4.0GB]
> 
> 
> updated the spark-env.sh on all slave nodes with IGNITE_HOME and class paths as below
> IGNITE_HOME=/home/ec2-user/rchalil/apache-ignite-fabric-2.1.0-bin
> IGNITE_LIBS="${IGNITE_HOME}/libs/*"
> for file in ${IGNITE_HOME}/libs/*
> 
> do
> 
>     if [ -d ${file} ] && [ "${file}" != "${IGNITE_HOME}"/libs/optional ]; then
> 
>         IGNITE_LIBS=${IGNITE_LIBS}:${file}/*
> 
>     fi
> 
> done
> 
> export spark.driver.extraClassPath=$IGNITE_LIBS
> 
> export spark.executor.extraClassPath=$IGNITE_LIBS
> 
> updated the default config.xml as attached with the static ips
> On the Spark Master node, added the  - <property name="clientMode" value="true"/>
> 
> In the simple test code, i tried 2 things, both without success:
> 
> 1. Added config/default-config.xml directly under the project and added the below snippet. 
> val CONFIG = "config/default-config.xml"
> val igniteContext = new IgniteContext(sparkContext, CONFIG, true)
> val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
> sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))
> sharedRDD.mapValues(x => (x * x))
> During execution, it complains that - class org.apache.ignite.IgniteCheckedException: Spring XML configuration path is invalid: config/default-config.xml. Note that this path should be either absolute or a relative local file system path, relative to META-INF in classpath or valid URL to IGNITE_HOME.
> 
> 
> 2. Commented the above snippet and use the below: 
> val ic = new IgniteContext(sparkContext, () => new IgniteConfiguration())
> val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("partitioned")
> sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))
> Now it complains that "it cannot find any IPs from multicast or IPFinder. 
> 
> Not sure what am i doing not right. I also have a cluster.properties file but I am not sure if it is needed for the yarn setup and I am not explicitly calling it anywhere. Thank you. 
> 
> 
> On Thu, Nov 2, 2017 at 9:00 AM, Denis Magda <dmagda@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> You can look at my example prepared for Ignite and Spark streaming use cases:
> https://github.com/dmagda/IgniteSparkIoT <https://github.com/dmagda/IgniteSparkIoT>
> 
> The data is being streamed into Ignite through a shared RDD and being queried using Ignite SQL. If you know the keys in advance then you can use Spark native RDDs APIs.
> 
> That example is in Java but it’s easy convertible to Scala.
> 
> Also, there is a simple Scala example available in every Ignite distribution that should be useful:
> https://github.com/apache/ignite/blob/master/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala <https://github.com/apache/ignite/blob/master/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala>
> 
> —
> Denis
> 
>> On Nov 1, 2017, at 1:22 PM, roshan joe <impdocs2008@gmail.com <ma...@gmail.com>> wrote:
>> 
>> I am very new to Ignite and went through the documentation and samples on the web. Below is the use case I am hoping to resolve using shared spark RDDs. 
>> 	• Spark Streaming App-1 writes incremental records, which does not already exist (by using a hash value generated inside the App) to a shared RDD. 
>> 	• Spark Streaming App-2 looks up the data in this shared RDD using key columns and gets additional column values from the shared RDD populated by the App-1 above. 
>> 	• Spark Streaming App-3, Spark Streaming App-4 etc do the same lookup against shared RDD, same as the above. 
>> I am sure this use-case has been implemented before and am trying to avoid spending time to build this from scratch. Is there some sample code to do this using scala that can be shared? Thank you very much!
>> 
>> 
>>  
> 
> 
> <default-config.xml>


Re: implement shared RDD in spark streaming in scala

Posted by Denis Magda <dm...@apache.org>.
Hi,

You can look at my example prepared for Ignite and Spark streaming use cases:
https://github.com/dmagda/IgniteSparkIoT

The data is being streamed into Ignite through a shared RDD and being queried using Ignite SQL. If you know the keys in advance then you can use Spark native RDDs APIs.

That example is in Java but it’s easy convertible to Scala.

Also, there is a simple Scala example available in every Ignite distribution that should be useful:
https://github.com/apache/ignite/blob/master/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala <https://github.com/apache/ignite/blob/master/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala>

—
Denis

> On Nov 1, 2017, at 1:22 PM, roshan joe <im...@gmail.com> wrote:
> 
> I am very new to Ignite and went through the documentation and samples on the web. Below is the use case I am hoping to resolve using shared spark RDDs. 
> 	• Spark Streaming App-1 writes incremental records, which does not already exist (by using a hash value generated inside the App) to a shared RDD. 
> 	• Spark Streaming App-2 looks up the data in this shared RDD using key columns and gets additional column values from the shared RDD populated by the App-1 above. 
> 	• Spark Streaming App-3, Spark Streaming App-4 etc do the same lookup against shared RDD, same as the above. 
> I am sure this use-case has been implemented before and am trying to avoid spending time to build this from scratch. Is there some sample code to do this using scala that can be shared? Thank you very much!
> 
> 
>