You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Ranjit Sahu <ra...@gmail.com> on 2017/02/10 03:23:05 UTC

Help needed

Hi Guys,

We are trying to use ignite key-value store inside the Spark cluster. What
we are to do is :

1) Load the data to a data frame in spark
2) While doing transformation on the data frame, on each executor node we
are starting the ignite cache and load the data.


The issue what we are seeing is , when i start  with 40 executor nodes, i
can see only 24+ nodes in ignite topology. There is nothing in the log with
exceptions and we are not seeing the full data available in the cache too.

We were not able to use SharedRDD of ignite as we are kind of in a Nested
RDD  situation and too avoid that we are trying to work with Ignite cache.

Any clue what's going wrong here.

Thanks,
Ranjit

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
Hi Jorn,

The use case what we are working on is something like this. We have three
functions, one to extractEntity, one to resolve and to resolve we need an
accesor.

So first we build a RDD calling extract function in Step 1 below.
Next we prepare the look up RDD and lets call it as Accesor.
Third is the resolve function which uses the RDD of step 1 and calls
function resolve which internally needs to use the Accesor which is again a
RDD.
As we are working with legacy code we cant do a join.

1) Inputdataà extract Function - > ExtRDD

2) Accesor à LookupRDD

3) ExtRDD ->resolve Function à  ExtRDD.map(x->resolve(Accesor ))

With this we ended up with Nested RDD issue and we got the below exception.

17/01/19 10:18:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
2, c911tae.int.westgroup.com): org.apache.spark.SparkException: *RDD
transformations and actions can only be invoked by the driver, not
inside of other transformations; for example, rdd1.map(x =>
rdd2.values.count() * x) is invalid because the values transformation
and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063.*

        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)

        at org.apache.spark.rdd.RDD.toArray(RDD.scala:929)

        at

        at java.lang.Thread.run(Thread.java:745)


To resolve this thought was to replace the Accesor which is a RDD with
IgniteCache.


below is the code what i am trying

// load the content to data frame


val dataframe =
sqlContext.read.format("com.databricks.spark.avro").option("header",
"true").load(sparkConf.get("spark.data.avroFiles"))


// Serialize the df to scala case class

val entityVOs = dataframe.map(row => {
      val wId = row.getAs[Long]("wId").toString
      val gId = row.getAs[String]("gId")
      val oaId = row.getAs[String]("oaId")
      val canName = row.getAs[String]("canName")
      var variantName = List[String]().asJavaCollection
      if(row.getAs[mutable.WrappedArray[String]]("variantName") != null)
        variantName =
row.getAs[mutable.WrappedArray[String]]("variantName").toList.asJavaCollection
      new EntityVO(wId,gId,oaId,canName,variantName)
    })

in the below code which start from the spark driver program i am
trying to build the cache on the worker nodes where ever the functon
executes


 /** Convert each partition to Entity Object and load to cache **/

    entityVOs.mapPartitions(x => {

      val ignite:Ignite = Ignition.getOrStart(new IgniteConfiguration())
      val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MY_Cahe_Name)
      orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
      orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
      orgCacheCfg.setIndexedTypes()
      val cache:IgniteCache[String, CompanyVO] =
ignite.getOrCreateCache(orgCacheCfg)
      while(x.hasNext){
        val entityVo = x.next()
        cache.put(entityVo.wId,entityVo)
      }
      x
    }).count()


This is my first RDD suppose created for some entities

val entityNames = Seq("YAMAHA MOTOR CORPORATION", "AVADO BRANDS ",
"MADISON MINERALS ", "PALM ", "CAREERENGINE NETWORK INC", "KJEMHUS
BULK SALES")

val entityNamesRDD = sparkContext.parallelize(companyNames)



Now the last step calling the resolve function :


entityNamesRDD.map(entityName => {

  companyAccessor.getEntityExact(entityName)

}).collect().foreach(println)


Below is my funciton where i try to connect to ignite again and fo a query


def getCompanyExact(entityName:String):Vector[EntityVO] = {
  val ignite:Ignite = Ignition.getOrStart(new IgniteConfiguration())
  val cacheConfig:CacheConfiguration[String, CompanyVO] = new
CacheConfiguration[String, EntityVO](MyCacheName);
  cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
  val wcaIngiteCache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(cacheConfig)
  val queryString = "canonicalName = ?"
  val companyNameQuery = new
SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(companyName)
  val results = wcaIngiteCache.query(companyNameQuery).getAll()
  val listIter = results.listIterator()
  val companyResults = ListBuffer[CompanyVO]()
......

  println(companyResults)
  companyResults.toVector
}










On Fri, Feb 10, 2017 at 11:00 AM, Jörn Franke <jo...@gmail.com> wrote:

> Not sure I got the picture of your setup, but the ignite cache should be
> started indecently of the application and not within the application.
>
> Aside from that, can you please elaborate more on the problem you like to
> solve - maybe with pseudocode? I am not sure if the approach you have
> selected is the right one.
>
> > On 10 Feb 2017, at 04:23, Ranjit Sahu <ra...@gmail.com> wrote:
> >
> > Hi Guys,
> >
> > We are trying to use ignite key-value store inside the Spark cluster.
> What we are to do is :
> >
> > 1) Load the data to a data frame in spark
> > 2) While doing transformation on the data frame, on each executor node
> we are starting the ignite cache and load the data.
> >
> >
> > The issue what we are seeing is , when i start  with 40 executor nodes,
> i can see only 24+ nodes in ignite topology. There is nothing in the log
> with exceptions and we are not seeing the full data available in the cache
> too.
> >
> > We were not able to use SharedRDD of ignite as we are kind of in a
> Nested RDD  situation and too avoid that we are trying to work with Ignite
> cache.
> >
> > Any clue what's going wrong here.
> >
> > Thanks,
> > Ranjit
> >
> >
>

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
Hi Jorn,

The use case what we are working on is something like this. We have three
functions, one to extractEntity, one to resolve and to resolve we need an
accesor.

So first we build a RDD calling extract function in Step 1 below.
Next we prepare the look up RDD and lets call it as Accesor.
Third is the resolve function which uses the RDD of step 1 and calls
function resolve which internally needs to use the Accesor which is again a
RDD.
As we are working with legacy code we cant do a join.

1) Inputdataà extract Function - > ExtRDD

2) Accesor à LookupRDD

3) ExtRDD ->resolve Function à  ExtRDD.map(x->resolve(Accesor ))

With this we ended up with Nested RDD issue and we got the below exception.

17/01/19 10:18:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
2, c911tae.int.westgroup.com): org.apache.spark.SparkException: *RDD
transformations and actions can only be invoked by the driver, not
inside of other transformations; for example, rdd1.map(x =>
rdd2.values.count() * x) is invalid because the values transformation
and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063.*

        at org.apache.spark.rdd.RDD.org
<http://org.apache.spark.rdd.rdd.org/>$apache$spark$rdd$RDD$$sc(RDD.scala:87)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)

        at org.apache.spark.rdd.RDD.toArray(RDD.scala:929)

        at

        at java.lang.Thread.run(Thread.java:745)


To resolve this thought was to replace the Accesor which is a RDD with
IgniteCache.


below is the code what i am trying

// load the content to data frame


val dataframe =
sqlContext.read.format("com.databricks.spark.avro").option("header",
"true").load(sparkConf.get("spark.data.avroFiles"))


// Serialize the df to scala case class

val entityVOs = dataframe.map(row => {
      val wId = row.getAs[Long]("wId").toString
      val gId = row.getAs[String]("gId")
      val oaId = row.getAs[String]("oaId")
      val canName = row.getAs[String]("canName")
      var variantName = List[String]().asJavaCollection
      if(row.getAs[mutable.WrappedArray[String]]("variantName") != null)
        variantName =
row.getAs[mutable.WrappedArray[String]]("variantName").toList.asJavaCollection
      new EntityVO(wId,gId,oaId,canName,variantName)
    })

in the below code which start from the spark driver program i am
trying to build the cache on the worker nodes where ever the functon
executes


 /** Convert each partition to Entity Object and load to cache **/

    entityVOs.mapPartitions(x => {

      val ignite:Ignite = Ignition.getOrStart(new IgniteConfiguration())
      val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MY_Cahe_Name)
      orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
      orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
      orgCacheCfg.setIndexedTypes()
      val cache:IgniteCache[String, CompanyVO] =
ignite.getOrCreateCache(orgCacheCfg)
      while(x.hasNext){
        val entityVo = x.next()
        cache.put(entityVo.wId,entityVo)
      }
      x
    }).count()


This is my first RDD suppose created for some entities

val entityNames = Seq("YAMAHA MOTOR CORPORATION", "AVADO BRANDS ",
"MADISON MINERALS ", "PALM ", "CAREERENGINE NETWORK INC", "KJEMHUS
BULK SALES")

val entityNamesRDD = sparkContext.parallelize(companyNames)



Now the last step calling the resolve function :


entityNamesRDD.map(entityName => {

  companyAccessor.getEntityExact(entityName)

}).collect().foreach(println)


Below is my funciton where i try to connect to ignite again and fo a query


def getCompanyExact(entityName:String):Vector[EntityVO] = {
  val ignite:Ignite = Ignition.getOrStart(new IgniteConfiguration())
  val cacheConfig:CacheConfiguration[String, CompanyVO] = new
CacheConfiguration[String, EntityVO](MyCacheName);
  cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
  val wcaIngiteCache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(cacheConfig)
  val queryString = "canonicalName = ?"
  val companyNameQuery = new
SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(companyName)
  val results = wcaIngiteCache.query(companyNameQuery).getAll()
  val listIter = results.listIterator()
  val companyResults = ListBuffer[CompanyVO]()
......

  println(companyResults)
  companyResults.toVector
}


The problem what we are having is , we are not able to see all the
data in the cahce when we query. Second even though i start with 40
executor nodes i can see only 20 + nodes in cache topology.

Sorry for the long email.


Thanks,

Ranjit




On Fri, Feb 10, 2017 at 11:00 AM, Jörn Franke <jo...@gmail.com> wrote:

> Not sure I got the picture of your setup, but the ignite cache should be
> started indecently of the application and not within the application.
>
> Aside from that, can you please elaborate more on the problem you like to
> solve - maybe with pseudocode? I am not sure if the approach you have
> selected is the right one.
>
> > On 10 Feb 2017, at 04:23, Ranjit Sahu <ra...@gmail.com> wrote:
> >
> > Hi Guys,
> >
> > We are trying to use ignite key-value store inside the Spark cluster.
> What we are to do is :
> >
> > 1) Load the data to a data frame in spark
> > 2) While doing transformation on the data frame, on each executor node
> we are starting the ignite cache and load the data.
> >
> >
> > The issue what we are seeing is , when i start  with 40 executor nodes,
> i can see only 24+ nodes in ignite topology. There is nothing in the log
> with exceptions and we are not seeing the full data available in the cache
> too.
> >
> > We were not able to use SharedRDD of ignite as we are kind of in a
> Nested RDD  situation and too avoid that we are trying to work with Ignite
> cache.
> >
> > Any clue what's going wrong here.
> >
> > Thanks,
> > Ranjit
> >
> >
>

Re: Help needed

Posted by Jörn Franke <jo...@gmail.com>.
Not sure I got the picture of your setup, but the ignite cache should be started indecently of the application and not within the application.

Aside from that, can you please elaborate more on the problem you like to solve - maybe with pseudocode? I am not sure if the approach you have selected is the right one.

> On 10 Feb 2017, at 04:23, Ranjit Sahu <ra...@gmail.com> wrote:
> 
> Hi Guys,
> 
> We are trying to use ignite key-value store inside the Spark cluster. What we are to do is :
> 
> 1) Load the data to a data frame in spark
> 2) While doing transformation on the data frame, on each executor node we are starting the ignite cache and load the data. 
> 
> 
> The issue what we are seeing is , when i start  with 40 executor nodes, i can see only 24+ nodes in ignite topology. There is nothing in the log with exceptions and we are not seeing the full data available in the cache too.
> 
> We were not able to use SharedRDD of ignite as we are kind of in a Nested RDD  situation and too avoid that we are trying to work with Ignite cache.
> 
> Any clue what's going wrong here.
> 
> Thanks,
> Ranjit
> 
>  

Re: Help needed

Posted by Andrey Mashenkov <an...@gmail.com>.
Hi Ranjit,

Don't worry about "-1% free offheap". It is a minor bug.

On Thu, Feb 16, 2017 at 11:03 PM, Ranjit Sahu <ra...@gmail.com> wrote:

> Hi Val,
>
> This got resolved. What was happening is as we were trying to start the
> nodes from the driver using a map function all were starting together and
> they were not able to find each other.
>
> So what i did is i start the ignite node on the driver first and passed
> the ip of driver to others so that they can join each other.
>
> One new issue i am seeing is sometimes all nodes not starting and the call
> hangs.
>
> The metrics log says non heap memory free as -1%. Any clue whats going
> wrong.
>
> On Thu, 16 Feb 2017 at 12:28 AM, vkulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
>> Ranjit,
>>
>> You can use any shared IP finder:
>> https://apacheignite.readme.io/docs/cluster-config
>>
>> If multicast doesn't work, consider using JDBC or shared FS based IP
>> finders.
>>
>> -Val
>>
>>
>>
>> --
>> View this message in context: http://apache-ignite-users.
>> 70518.x6.nabble.com/Help-needed-tp10540p10653.html
>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>
>


-- 
Best regards,
Andrey V. Mashenkov

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
Hi Val,

This got resolved. What was happening is as we were trying to start the
nodes from the driver using a map function all were starting together and
they were not able to find each other.

So what i did is i start the ignite node on the driver first and passed the
ip of driver to others so that they can join each other.

One new issue i am seeing is sometimes all nodes not starting and the call
hangs.

The metrics log says non heap memory free as -1%. Any clue whats going
wrong.

On Thu, 16 Feb 2017 at 12:28 AM, vkulichenko <va...@gmail.com>
wrote:

> Ranjit,
>
> You can use any shared IP finder:
> https://apacheignite.readme.io/docs/cluster-config
>
> If multicast doesn't work, consider using JDBC or shared FS based IP
> finders.
>
> -Val
>
>
>
> --
> View this message in context:
> http://apache-ignite-users.70518.x6.nabble.com/Help-needed-tp10540p10653.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Help needed

Posted by vkulichenko <va...@gmail.com>.
Ranjit,

You can use any shared IP finder:
https://apacheignite.readme.io/docs/cluster-config

If multicast doesn't work, consider using JDBC or shared FS based IP
finders.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Help-needed-tp10540p10653.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
Yep, i will change that , so that we can load to basically all nodes. It
will end up again as my initial function. Start and load on each executor.
One more thing i noticed is , when i connect as Client and load it is 150+%
slow. Any pointer here ?

We have a shared service in java which is used for entity extarction and
resolution . We are working on making it spark friendly.
The resolution function basically needs an accesor-> DAO and connects to
external resource for functioning.
the idea is to use ignite in place of it. The flow is something like this.
call on one rdd of input data the resolve function. Inside resolve function
use ignite to query and resolve to correct entity.


The blocker for us now is, when i start the executor nodes , suppose 20,
they(all) are not joining each other with ignite. I can see many sub groups
sometimes also.
What should we look at. I did not configure any Static ip for discovery
here as resources are managed by yarn for spark.

I have tried the multicast group address which we use in another project
for oracle coherence. That is also not helping.


Thanks a ton  and i appreciate your help and quick response for my emails.

Thanks,
Ranjit



On Tue, Feb 14, 2017 at 9:54 PM, vkulichenko <va...@gmail.com>
wrote:

> Well, a node will be started only on executors where getOrStart is called.
> So
> if number of nodes is less then number of executors, it means that you
> didn't visit all executors when mapping the original RDD.
>
> What is the business use case behind that? I'm not sure you chose a right
> approach.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Help-needed-tp10540p10636.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Help needed

Posted by vkulichenko <va...@gmail.com>.
Well, a node will be started only on executors where getOrStart is called. So
if number of nodes is less then number of executors, it means that you
didn't visit all executors when mapping the original RDD.

What is the business use case behind that? I'm not sure you chose a right
approach.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Help-needed-tp10540p10636.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
one more point to add is in my case the query will start from executor node
and not driver node.

On Tue, Feb 14, 2017 at 2:42 PM, Ranjit Sahu <ra...@gmail.com> wrote:

> Hi Val,
>
> let me explain it again. What i am looking is to built the cache for each
> spark application and destroy it when spark app completes.
> Something like you guys have with IgniteRDD in embedded mode. I can't use
> IgniteRDD as we are getting into a nested RDD situation with our legacy
> code.
>
> I changed my code little bit now. Please have a look and let me know if
> this looks ok.
>
>
> 1) val workers = sparkContext.getConf.getInt("spark.executor.instances",
> sparkContext.getExecutorStorageStatus.length)
>
>       // Start ignite server node on each worker in server mode.
>     sparkContext.parallelize(1 to workers, workers).foreachPartition(it =>
> ignite())
>
>  def ignite(): Ignite ={
>       val cfg = new IgniteConfiguration();
>       val ignite:Ignite = Ignition.start(cfg)
>       logInfo("Starting ignite node")
>     ignite
>     }
>
> 2)  val dataframe = sqlContext.read.format("com.databricks.spark.avro").option("header",
> "true").load(sparkConf.get("spark.data.avroFiles"))
>
> val EntityRDD= dataframe.map (Desrialaize and save)
>
> 3) Load each partition to cache
>
>   EntityRDD.mapPaatitions(x => {
>
>      val cfg = new IgniteConfiguration();
>      Ignition.setClientMode(true);
>      val ignite:Ignite = Ignition.getOrStart(cfg)
>       val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
> CacheConfiguration[String, EntityVO](MyCache)
>       orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
>       orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
>       orgCacheCfg.setIndexedTypes()
>       val cache:IgniteCache[String, EntityVO] = ignite.getOrCreateCache(
> orgCacheCfg)
>        while(x.hasNext){
>         val entityvo = x.next()
>         cache.put(entityvo.Id,entityvo)
>       }
>       x
>     }).count()
>
> 4) Use the cache for look up :
>
>  enitityNamesRDD.map(entityName => {
>
>        val cfg = new IgniteConfiguration();
>
>        Ignition.setClientMode(true);
>
>        val ignite:Ignite = Ignition.getOrStart(cfg)
>        val cacheConfig:CacheConfiguration[String, EntityVO] = new
> CacheConfiguration[String, EntityVO](MyCache);
>        cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
>        val wcaIngiteCache:IgniteCache[String, EntityVO] =
> ignite.getOrCreateCache(cacheConfig)
>        val queryString = "canonicalName = ?"
>        val companyNameQuery = new SqlQuery[String,EntityVO]("
> EntityVO",queryString).setArgs(entityName)
>        val results = wcaIngiteCache.query(companyNameQuery).getAll()
>        val listIter = results.listIterator()
>        val compResults = ListBuffer[EntityVO]()
>        while(listIter.hasNext){
>          val compObject = listIter.next()
>          if(compObject.getValue.isInstanceOf[EntityVO])
>            companyResults += compObject.getValue.asInstanceOf[EntityVO]
>        }
>          compResults.toVector
>
>    }).collect().foreach(println)
>
>
>
>
> Thanks,
> Ranjit
>
>
>
>
> On Tue, Feb 14, 2017 at 3:08 AM, vkulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
>> Hi Ranjit,
>>
>> Not sure I understood. The main problem with executors is that they are
>> controlled by Spark and they are created per application. So you can't
>> share
>> the data stored in embedded mode and it's not really safe to store it
>> there.
>> This can be useful only for some simple tests/demos, but not for real
>> apps.
>> Let me know if I'm missing something in your use case.
>>
>> -Val
>>
>>
>>
>> --
>> View this message in context: http://apache-ignite-users.705
>> 18.x6.nabble.com/Help-needed-tp10540p10607.html
>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>
>
>

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
Hi Val,

let me explain it again. What i am looking is to built the cache for each
spark application and destroy it when spark app completes.
Something like you guys have with IgniteRDD in embedded mode. I can't use
IgniteRDD as we are getting into a nested RDD situation with our legacy
code.

I changed my code little bit now. Please have a look and let me know if
this looks ok.


1) val workers =
sparkContext.getConf.getInt("spark.executor.instances",sparkContext.getExecutorStorageStatus.length)

      // Start ignite server node on each worker in server mode.
    sparkContext.parallelize(1 to workers, workers).foreachPartition(it =>
ignite())

 def ignite(): Ignite ={
      val cfg = new IgniteConfiguration();
      val ignite:Ignite = Ignition.start(cfg)
      logInfo("Starting ignite node")
    ignite
    }

2)  val dataframe =
sqlContext.read.format("com.databricks.spark.avro").option("header",
"true").load(sparkConf.get("spark.data.avroFiles"))

val EntityRDD= dataframe.map (Desrialaize and save)

3) Load each partition to cache

  EntityRDD.mapPaatitions(x => {

     val cfg = new IgniteConfiguration();
     Ignition.setClientMode(true);
     val ignite:Ignite = Ignition.getOrStart(cfg)
      val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MyCache)
      orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
      orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
      orgCacheCfg.setIndexedTypes()
      val cache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(orgCacheCfg)
       while(x.hasNext){
        val entityvo = x.next()
        cache.put(entityvo.Id,entityvo)
      }
      x
    }).count()

4) Use the cache for look up :

 enitityNamesRDD.map(entityName => {

       val cfg = new IgniteConfiguration();

       Ignition.setClientMode(true);

       val ignite:Ignite = Ignition.getOrStart(cfg)
       val cacheConfig:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MyCache);
       cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
       val wcaIngiteCache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(cacheConfig)
       val queryString = "canonicalName = ?"
       val companyNameQuery = new
SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(entityName)
       val results = wcaIngiteCache.query(companyNameQuery).getAll()
       val listIter = results.listIterator()
       val compResults = ListBuffer[EntityVO]()
       while(listIter.hasNext){
         val compObject = listIter.next()
         if(compObject.getValue.isInstanceOf[EntityVO])
           companyResults += compObject.getValue.asInstanceOf[EntityVO]
       }
         compResults.toVector

   }).collect().foreach(println)




Thanks,
Ranjit




On Tue, Feb 14, 2017 at 3:08 AM, vkulichenko <va...@gmail.com>
wrote:

> Hi Ranjit,
>
> Not sure I understood. The main problem with executors is that they are
> controlled by Spark and they are created per application. So you can't
> share
> the data stored in embedded mode and it's not really safe to store it
> there.
> This can be useful only for some simple tests/demos, but not for real apps.
> Let me know if I'm missing something in your use case.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Help-needed-tp10540p10607.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Help needed

Posted by vkulichenko <va...@gmail.com>.
Hi Ranjit,

Not sure I understood. The main problem with executors is that they are
controlled by Spark and they are created per application. So you can't share
the data stored in embedded mode and it's not really safe to store it there.
This can be useful only for some simple tests/demos, but not for real apps.
Let me know if I'm missing something in your use case.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Help-needed-tp10540p10607.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Help needed

Posted by Ranjit Sahu <ra...@gmail.com>.
Hi Val,

Actually the idea was not to build something dedicated like the standalone
cluster for ignite.
Do you foresee any issues if i run the ignite inside the executor node.
So when i start my spark job, the ignite cluster will be kind of the
executor nodes and we will be querying again from the executor nodes the
cache.


Thanks,
Ranjit

On Sat, Feb 11, 2017 at 4:13 AM, vkulichenko <va...@gmail.com>
wrote:

> Ranjit,
>
> I would recommend you to start a standalone Ignite cluster, separate from
> Spark. You may also need to specify clientMode=true in the configuration
> provided to IgniteContext, to make sure that nodes on executors are
> clients,
> not servers.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Help-needed-tp10540p10558.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Help needed

Posted by vkulichenko <va...@gmail.com>.
Ranjit,

I would recommend you to start a standalone Ignite cluster, separate from
Spark. You may also need to specify clientMode=true in the configuration
provided to IgniteContext, to make sure that nodes on executors are clients,
not servers.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Help-needed-tp10540p10558.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.