You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by manuelmourato <ma...@gmail.com> on 2017/10/28 20:47:10 UTC

Using two ignite contexts with spark streaming

Hello there,

My use case is relatively simple: I want to be able to save an RDD in Spark
to two different caches in Ignite, inside the same Spark Context.

When I try to save an RDD to a single IgniteCache, everything works well:

  case class Sensor_Att(
                         @(QuerySqlField @field)(index = false)    active:
String,
                         @(QuerySqlField @field)(index = false)    `type`:
String,
                         @(QuerySqlField @field)(index = true)    name:
String,
                       )

    val sqlContext: SparkSession =
SparkSession.builder().master("local[*]").appName("DataProcessing").getOrCreate()
    val sc: SparkContext = sqlContext.sparkContext
     val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    val ignitec:IgniteContext = new IgniteContext(sc,()=>new
IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
      setCacheConfiguration(new
CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("sensorData").
       
setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
TcpDiscoverySpi().
      setLocalAddress("127.0.0.1").setLocalPort(47090).setIpFinder(new
TcpDiscoveryMulticastIpFinder().
      setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
TcpCommunicationSpi().setLocalPort(47000)))


      val cachedRDD:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data1")
      val  RDD_with_key: RDD[(String, Sensor_Att)]
=df_RDD_NEW_CLASS.map(x=>(x.name,x))
      cachedRDD.savePairs(RDD_with_key)
      val df=cachedRDD.sql("select * from Sensor_Att")
      df.show()


If however I try to add a second IgniteContext, using the same class as an
index, and try to save an RDD to its cache, like so:

(code above...)
 val ignitec=...
 val ignitec2:IgniteContext = new IgniteContext(sc,()=>new
IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
      setCacheConfiguration(new
CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("historicsensorData").
       
setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
TcpDiscoverySpi().
      setLocalAddress("127.0.0.1").setLocalPort(47091).setIpFinder(new
TcpDiscoveryMulticastIpFinder().
      setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
TcpCommunicationSpi().setLocalPort(47007)))

(code above....)
      val df=cachedRDD.sql("select * from Sensor_Att")
      df.show()

      val cachedRDD2:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data2")
      cachedRDD2.savePairs(RDD_with_key)
      val df2=cachedRDD2.sql("select * from Sensor_Att")
      df2.show()


I get the following error:

javax.cache.CacheException: class
org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to
parse query: select * from Sensor_Att
	at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:807)
	at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:765)
	at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:147)
	at
sensorApp.SensorDataProcessing$.sensorApp$SensorDataProcessing$$data_proces
        (...)


It seems that I can't derive a second IgniteContext from the same
SparkContext, because it seems that the "Data2" cache was not created.
Do you have any suggestions about this?

Thank you.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using two ignite contexts with spark streaming

Posted by Denis Mekhanikov <dm...@gmail.com>.
Hi!

I don't really see what you are trying to achieve.
In the example, that you provided, the second IgniteContext, called
ignitec2, is not used. Do you mean that when you start the second
IgniteContext, then both of them stop working?

When ignitec.fromCache("Data2") is executed, then cache is getting created,
if it haven't been yet. If you see in log that SQL execution fails because
of lacking tables, then query entities are probably not configured
correctly.

Try to find a minimal example, that doesn't work for you, and attach it
here as an archived project. It will be easier to tell what is wrong.

Denis

сб, 28 окт. 2017 г. в 23:47, manuelmourato <ma...@gmail.com>:

> Hello there,
>
> My use case is relatively simple: I want to be able to save an RDD in Spark
> to two different caches in Ignite, inside the same Spark Context.
>
> When I try to save an RDD to a single IgniteCache, everything works well:
>
>   case class Sensor_Att(
>                          @(QuerySqlField @field)(index = false)    active:
> String,
>                          @(QuerySqlField @field)(index = false)    `type`:
> String,
>                          @(QuerySqlField @field)(index = true)    name:
> String,
>                        )
>
>     val sqlContext: SparkSession =
>
> SparkSession.builder().master("local[*]").appName("DataProcessing").getOrCreate()
>     val sc: SparkContext = sqlContext.sparkContext
>      val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
>
>     val ignitec:IgniteContext = new IgniteContext(sc,()=>new
>
> IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
>       setCacheConfiguration(new
>
> CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("sensorData").
>
>
> setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
> TcpDiscoverySpi().
>       setLocalAddress("127.0.0.1").setLocalPort(47090).setIpFinder(new
> TcpDiscoveryMulticastIpFinder().
>       setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
> TcpCommunicationSpi().setLocalPort(47000)))
>
>
>       val cachedRDD:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data1")
>       val  RDD_with_key: RDD[(String, Sensor_Att)]
> =df_RDD_NEW_CLASS.map(x=>(x.name,x))
>       cachedRDD.savePairs(RDD_with_key)
>       val df=cachedRDD.sql("select * from Sensor_Att")
>       df.show()
>
>
> If however I try to add a second IgniteContext, using the same class as an
> index, and try to save an RDD to its cache, like so:
>
> (code above...)
>  val ignitec=...
>  val ignitec2:IgniteContext = new IgniteContext(sc,()=>new
>
> IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
>       setCacheConfiguration(new
>
> CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("historicsensorData").
>
>
> setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
> TcpDiscoverySpi().
>       setLocalAddress("127.0.0.1").setLocalPort(47091).setIpFinder(new
> TcpDiscoveryMulticastIpFinder().
>       setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
> TcpCommunicationSpi().setLocalPort(47007)))
>
> (code above....)
>       val df=cachedRDD.sql("select * from Sensor_Att")
>       df.show()
>
>       val
> cachedRDD2:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data2")
>       cachedRDD2.savePairs(RDD_with_key)
>       val df2=cachedRDD2.sql("select * from Sensor_Att")
>       df2.show()
>
>
> I get the following error:
>
> javax.cache.CacheException: class
> org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to
> parse query: select * from Sensor_Att
>         at
>
> org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:807)
>         at
>
> org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:765)
>         at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:147)
>         at
> sensorApp.SensorDataProcessing$.sensorApp$SensorDataProcessing$$data_proces
>         (...)
>
>
> It seems that I can't derive a second IgniteContext from the same
> SparkContext, because it seems that the "Data2" cache was not created.
> Do you have any suggestions about this?
>
> Thank you.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>