You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by "Rosellini, Luca" <lr...@keedio.com> on 2018/03/23 13:31:03 UTC

Saving a DataFrame from Spark and accessing data as key/value externally

Hi all,
I am using Apache Ignite 2.4 and I've successfully saved a Spark Dataframe
as a SQL table in the Ignite caching layer.

I am trying to access the data from an external Java program (completely
unrelated to the Spark Job that produced and saved the table) using the
Cache API, as if it were a key/value store.

The table, called 'PERSON', has a primary key field called UUID and maps to
an Ignite cache called SQL_PUBLIC_PERSON.

Using the Ignite Cache API I am able to check that that a specific entry
exists in the cache calling:

cache.containsKey(...)


By the way, If I try to get the value calling cache.get(...) for a specific
key I get a ClassNotFoundException (full stacktrace is attached).

Now, I guess Ignite dinamically generated a schema bean for my DataFrame
when saving the DataFrame itself in Spark.
Since the generated bean class name also seems to be generated whith some
internal rule (in this example it's
'SQL_PUBLIC_PERSON_da18b6a2_8b41_4c34_9451_6fd9ace8e73d') I am not sure if
this usage pattern does make sense at all.

I am very new to Apache Ignite so I'd like to apologize if this is a silly
question, but I am not able to find any clue in the official documentation.

Thanks,
Luca

Re: Saving a DataFrame from Spark and accessing data as key/value externally

Posted by "Rosellini, Luca" <lr...@keedio.com>.
Hi Nikolay,
it works, thank you.

Luca


2018-03-26 22:29 GMT+02:00 Nikolay Izhikov <ni...@apache.org>:

> Hello, Luca.
>
> Please, try to use *withKeepBinary()* method to use BinaryMarshaller while
> query SQL rows from cache.
> Please, write me if it doesn't help.
>
>   @Test
>    public void testIgniteCaches() {
>      Ignite ignite = Ignition.start("/Users/luca/
> java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml");
>
>      IgniteCache<String, Object> cache = ignite.getOrCreateCache("SQL_
> PUBLIC_PERSON").withKeepBinary();
>
>      // OK
>      System.out.println(cache.containsKey("C2A9BCD3-FE93-
> 4E44-98D9-043086A154A3"));
>
>      // throws exception
>      System.out.println(cache.get("C2A9BCD3-FE93-4E44-98D9-
> 043086A154A3"));
>    }
>
>
>
> В Пт, 23/03/2018 в 18:13 +0100, Rosellini, Luca пишет:
> > Hi Nikolay,
> > let's say I have the following ignite config (it's the default config
> with persistent store enabled):
> >
> > $ cat /Users/luca/java/apache-ignite-fabric-2.4.0-bin/
> config/default-config.xml
> > <?xml version="1.0" encoding="UTF-8"?>
> > <beans xmlns="http://www.springframework.org/schema/beans"
> >        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> >        xsi:schemaLocation="
> >        http://www.springframework.org/schema/beans
> >        http://www.springframework.org/schema/beans/spring-beans.xsd">;
> >
> >     <bean id="grid.cfg" class="org.apache.ignite.configuration.
> IgniteConfiguration">
> >         <property name="dataStorageConfiguration">
> >         <bean class="org.apache.ignite.configuration.
> DataStorageConfiguration">
> >             <property name="defaultDataRegionConfiguration">
> >                 <bean class="org.apache.ignite.configuration.
> DataRegionConfiguration">
> >                     <property name="persistenceEnabled" value="true">
> >                 </property></bean>
> >             </property>
> >         </bean>
> >         </property>
> >     </bean>
> > </beans>
> >
> > I have the following json data file :
> > {"uuid": "56734043-912F-44AF-9AB0-929482ECCC60","name":
> "Leonardo","surname": "Da Vinci"}
> > {"uuid": "C2A9BCD3-FE93-4E44-98D9-043086A154A3","name":
> "Raffaello","surname": "Sanzio"}
> > Using the following Spark code I load it as a DataFrame:
> > final class DataFrameWriteTest extends FunSuite {
> >
> >   private lazy val spark = SparkSession.builder()
> >     .appName("Example Program")
> >     .master("local[1]")
> >   .getOrCreate()
> >
> >   test("Test write dataframe") {
> >     val cfg = "/Users/luca/java/apache-ignite-fabric-2.4.0-bin/
> config/default-config.xml"
> >
> >     val df = spark.read.json("/Users/luca/projects/keedio/spark-sbt-
> skel/src/test/resources/data.json")
> >
> >     df.printSchema()
> >     df.write
> >       .format(FORMAT_IGNITE)
> >       .option(OPTION_CONFIG_FILE, cfg)
> >       .option(OPTION_TABLE, "PERSON")
> >       .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "uuid")
> >       .mode(SaveMode.Append)
> >       .save()
> >
> >   }
> > }
> > The dataframe is successfully saved in an Ignite cache called
> SQL_PUBLIC_PERSON, and I can successfully query it via JDBC, using DBeaver,
> for example:
> >
> >
> > So, now, I'd like to acces the cache using Ignite key/value abstraction,
> using the following Java code:
> > public class JIgniteCacheTest {
> >   @Test
> >   public void testIgniteCaches() {
> >     Ignite ignite = Ignition.start("/Users/luca/
> java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml");
> >
> >     IgniteCache<String, Object> cache = ignite.getOrCreateCache("SQL_
> PUBLIC_PERSON");
> >
> >     // OK
> >     System.out.println(cache.containsKey("C2A9BCD3-FE93-
> 4E44-98D9-043086A154A3"));
> >
> >     // throws exception
> >     System.out.println(cache.get("C2A9BCD3-FE93-4E44-98D9-
> 043086A154A3"));
> >   }
> > As you can see, I am pointing to the same Ignite configuration.
> >  The call cache.containsKey(...) returns successfully, while the call to
> cache.get(...) throws the following exception:
> > javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException:
> SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
> >
> >     at org.apache.ignite.internal.processors.cache.GridCacheUtils.
> convertToCacheException(GridCacheUtils.java:1294)
> >     at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.
> cacheException(IgniteCacheProxyImpl.java:1673)
> >     at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.get(IgniteCacheProxyImpl.java:852)
> >     at org.apache.ignite.internal.processors.cache.
> GatewayProtectedCacheProxy.get(GatewayProtectedCacheProxy.java:676)
> >     at com.keedio.ignite.JIgniteCacheTest.testIgniteCaches(
> JIgniteCacheTest.java:22)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >     at java.lang.reflect.Method.invoke(Method.java:498)
> >     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:50)
> >     at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:12)
> >     at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:47)
> >     at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:17)
> >     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >     at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:78)
> >     at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:57)
> >     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:68)
> >     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.
> startRunnerWithArgs(IdeaTestRunner.java:47)
> >     at com.intellij.rt.execution.junit.JUnitStarter.
> prepareStreamsAndStart(JUnitStarter.java:242)
> >     at com.intellij.rt.execution.junit.JUnitStarter.main(
> JUnitStarter.java:70)
> > Caused by: class org.apache.ignite.IgniteCheckedException:
> SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
> >     at org.apache.ignite.internal.util.IgniteUtils.cast(
> IgniteUtils.java:7244)
> >     at org.apache.ignite.internal.util.future.GridFutureAdapter.
> resolve(GridFutureAdapter.java:259)
> >     at org.apache.ignite.internal.util.future.GridFutureAdapter.
> get0(GridFutureAdapter.java:190)
> >     at org.apache.ignite.internal.util.future.GridFutureAdapter.
> get(GridFutureAdapter.java:140)
> >     at org.apache.ignite.internal.processors.cache.
> GridCacheAdapter.get0(GridCacheAdapter.java:4572)
> >     at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(
> GridCacheAdapter.java:4546)
> >     at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(
> GridCacheAdapter.java:1347)
> >     at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.get(IgniteCacheProxyImpl.java:849)
> >     ... 24 more
> > Caused by: java.lang.ClassNotFoundException: SQL_PUBLIC_PERSON_eafe5efa_
> a96b_4f76_a72b_7b9e4c2d3fcc
> >     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
> >     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >     at java.lang.Class.forName0(Native Method)
> >     at java.lang.Class.forName(Class.java:348)
> >     at org.apache.ignite.internal.util.IgniteUtils.forName(
> IgniteUtils.java:8548)
> >     at org.apache.ignite.internal.MarshallerContextImpl.getClass(
> MarshallerContextImpl.java:340)
> >     at org.apache.ignite.internal.binary.BinaryContext.
> descriptorForTypeId(BinaryContext.java:687)
> >     at org.apache.ignite.internal.binary.BinaryReaderExImpl.
> deserialize0(BinaryReaderExImpl.java:1755)
> >     at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(
> BinaryReaderExImpl.java:1714)
> >     at org.apache.ignite.internal.binary.BinaryObjectImpl.
> deserializeValue(BinaryObjectImpl.java:797)
> >     at org.apache.ignite.internal.binary.BinaryObjectImpl.value(
> BinaryObjectImpl.java:143)
> >     at org.apache.ignite.internal.processors.cache.
> CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:177)
> >     at org.apache.ignite.internal.processors.cache.CacheObjectUtils.
> unwrapBinaryIfNeeded(CacheObjectUtils.java:67)
> >     at org.apache.ignite.internal.processors.cache.CacheObjectContext.
> unwrapBinaryIfNeeded(CacheObjectContext.java:125)
> >     at org.apache.ignite.internal.processors.cache.GridCacheContext.
> unwrapBinaryIfNeeded(GridCacheContext.java:1745)
> >     at org.apache.ignite.internal.processors.cache.GridCacheContext.
> unwrapBinaryIfNeeded(GridCacheContext.java:1733)
> >     at org.apache.ignite.internal.processors.cache.distributed.dht.
> GridPartitionedSingleGetFuture.setResult(GridPartitionedSingleGetFuture
> .java:679)
> >     at org.apache.ignite.internal.processors.cache.distributed.dht.
> GridPartitionedSingleGetFuture.onResult(GridPartitionedSingleGetFuture
> .java:536)
> >     at org.apache.ignite.internal.processors.cache.distributed.
> dht.GridDhtCacheAdapter.processNearSingleGetResponse(
> GridDhtCacheAdapter.java:349)
> >     at org.apache.ignite.internal.processors.cache.distributed.
> dht.atomic.GridDhtAtomicCache.access$1400(GridDhtAtomicCache.java:130)
> >     at org.apache.ignite.internal.processors.cache.distributed.
> dht.atomic.GridDhtAtomicCache$15.apply(GridDhtAtomicCache.java:422)
> >     at org.apache.ignite.internal.processors.cache.distributed.
> dht.atomic.GridDhtAtomicCache$15.apply(GridDhtAtomicCache.java:417)
> >     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.
> processMessage(GridCacheIoManager.java:1060)
> >     at org.apache.ignite.internal.processors.cache.
> GridCacheIoManager.onMessage0(GridCacheIoManager.java:579)
> >     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.
> handleMessage(GridCacheIoManager.java:378)
> >     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.
> handleMessage(GridCacheIoManager.java:304)
> >     at org.apache.ignite.internal.processors.cache.
> GridCacheIoManager.access$100(GridCacheIoManager.java:99)
> >     at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.
> onMessage(GridCacheIoManager.java:293)
> >     at org.apache.ignite.internal.managers.communication.
> GridIoManager.invokeListener(GridIoManager.java:1555)
> >     at org.apache.ignite.internal.managers.communication.GridIoManager.
> processRegularMessage0(GridIoManager.java:1183)
> >     at org.apache.ignite.internal.managers.communication.
> GridIoManager.access$4200(GridIoManager.java:126)
> >     at org.apache.ignite.internal.managers.communication.
> GridIoManager$9.run(GridIoManager.java:1090)
> >     at org.apache.ignite.internal.util.StripedExecutor$Stripe.
> run(StripedExecutor.java:505)
> >     at java.lang.Thread.run(Thread.java:748)
> >
> > Hope you can help me shed some light on this.
> >
> > Thanks,
> > Luca
> >
> >
> > 2018-03-23 14:33 GMT+01:00 Nikolay Izhikov <ni...@apache.org>:
> > > Hello, Luca.
> > >
> > > Can you attach some simple reproducer or code piece that cause
> exception?
> > >
> > > В Пт, 23/03/2018 в 14:31 +0100, Rosellini, Luca пишет:
> > > > Hi all,
> > > > I am using Apache Ignite 2.4 and I've successfully saved a Spark
> Dataframe as a SQL table in the Ignite caching layer.
> > > >
> > > > I am trying to access the data from an external Java program
> (completely unrelated to the Spark Job that produced and saved the table)
> using the Cache API, as if it were a key/value store.
> > > >
> > > > The table, called 'PERSON', has a primary key field called UUID and
> maps to an Ignite cache called SQL_PUBLIC_PERSON.
> > > >
> > > > Using the Ignite Cache API I am able to check that that a specific
> entry exists in the cache calling:
> > > > cache.containsKey(...)
> > > >
> > > > By the way, If I try to get the value calling cache.get(...) for a
> specific key I get a ClassNotFoundException (full stacktrace is attached).
> > > >
> > > > Now, I guess Ignite dinamically generated a schema bean for my
> DataFrame when saving the DataFrame itself in Spark.
> > > > Since the generated bean class name also seems to be generated whith
> some internal rule (in this example it's 'SQL_PUBLIC_PERSON_da18b6a2_8b41_4c34_9451_6fd9ace8e73d')
> I am not sure if this usage pattern does make sense at all.
> > > >
> > > > I am very new to Apache Ignite so I'd like to apologize if this is a
> silly question, but I am not able to find any clue in the official
> documentation.
> > > >
> > > > Thanks,
> > > > Luca
> >
> >
>

Re: Saving a DataFrame from Spark and accessing data as key/value externally

Posted by Nikolay Izhikov <ni...@apache.org>.
Hello, Luca.

Please, try to use *withKeepBinary()* method to use BinaryMarshaller while query SQL rows from cache.
Please, write me if it doesn't help.

  @Test
   public void testIgniteCaches() {
     Ignite ignite = Ignition.start("/Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml");
 
     IgniteCache<String, Object> cache = ignite.getOrCreateCache("SQL_PUBLIC_PERSON").withKeepBinary();
     
     // OK
     System.out.println(cache.containsKey("C2A9BCD3-FE93-4E44-98D9-043086A154A3"));
 
     // throws exception
     System.out.println(cache.get("C2A9BCD3-FE93-4E44-98D9-043086A154A3"));
   }
 


В Пт, 23/03/2018 в 18:13 +0100, Rosellini, Luca пишет:
> Hi Nikolay,
> let's say I have the following ignite config (it's the default config with persistent store enabled):
> 
> $ cat /Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>        xsi:schemaLocation="
>        http://www.springframework.org/schema/beans
>        http://www.springframework.org/schema/beans/spring-beans.xsd">;
> 
>     <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
>         <property name="dataStorageConfiguration">
>         <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
>             <property name="defaultDataRegionConfiguration">
>                 <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
>                     <property name="persistenceEnabled" value="true">
>                 </property></bean>
>             </property>
>         </bean>
>         </property>
>     </bean>
> </beans>
> 
> I have the following json data file :
> {"uuid": "56734043-912F-44AF-9AB0-929482ECCC60","name": "Leonardo","surname": "Da Vinci"}
> {"uuid": "C2A9BCD3-FE93-4E44-98D9-043086A154A3","name": "Raffaello","surname": "Sanzio"}
> Using the following Spark code I load it as a DataFrame:
> final class DataFrameWriteTest extends FunSuite {
>   
>   private lazy val spark = SparkSession.builder()
>     .appName("Example Program")
>     .master("local[1]")
>   .getOrCreate()
>   
>   test("Test write dataframe") {
>     val cfg = "/Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml"
> 
>     val df = spark.read.json("/Users/luca/projects/keedio/spark-sbt-skel/src/test/resources/data.json")
>     
>     df.printSchema()
>     df.write
>       .format(FORMAT_IGNITE)
>       .option(OPTION_CONFIG_FILE, cfg)
>       .option(OPTION_TABLE, "PERSON")
>       .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "uuid")
>       .mode(SaveMode.Append)
>       .save()
>     
>   }
> }
> The dataframe is successfully saved in an Ignite cache called SQL_PUBLIC_PERSON, and I can successfully query it via JDBC, using DBeaver, for example:
> 
> 
> So, now, I'd like to acces the cache using Ignite key/value abstraction, using the following Java code:
> public class JIgniteCacheTest {
>   @Test
>   public void testIgniteCaches() {
>     Ignite ignite = Ignition.start("/Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml");
> 
>     IgniteCache<String, Object> cache = ignite.getOrCreateCache("SQL_PUBLIC_PERSON");
>     
>     // OK
>     System.out.println(cache.containsKey("C2A9BCD3-FE93-4E44-98D9-043086A154A3"));
> 
>     // throws exception
>     System.out.println(cache.get("C2A9BCD3-FE93-4E44-98D9-043086A154A3"));
>   }
> As you can see, I am pointing to the same Ignite configuration.
>  The call cache.containsKey(...) returns successfully, while the call to cache.get(...) throws the following exception:
> javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException: SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
> 
>     at org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1294)
>     at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.cacheException(IgniteCacheProxyImpl.java:1673)
>     at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.get(IgniteCacheProxyImpl.java:852)
>     at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.get(GatewayProtectedCacheProxy.java:676)
>     at com.keedio.ignite.JIgniteCacheTest.testIgniteCaches(JIgniteCacheTest.java:22)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: class org.apache.ignite.IgniteCheckedException: SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
>     at org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7244)
>     at org.apache.ignite.internal.util.future.GridFutureAdapter.resolve(GridFutureAdapter.java:259)
>     at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:190)
>     at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:140)
>     at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4572)
>     at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4546)
>     at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1347)
>     at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.get(IgniteCacheProxyImpl.java:849)
>     ... 24 more
> Caused by: java.lang.ClassNotFoundException: SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8548)
>     at org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:340)
>     at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:687)
>     at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1755)
>     at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1714)
>     at org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:797)
>     at org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:143)
>     at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:177)
>     at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:67)
>     at org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:125)
>     at org.apache.ignite.internal.processors.cache.GridCacheContext.unwrapBinaryIfNeeded(GridCacheContext.java:1745)
>     at org.apache.ignite.internal.processors.cache.GridCacheContext.unwrapBinaryIfNeeded(GridCacheContext.java:1733)
>     at org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.setResult(GridPartitionedSingleGetFuture.java:679)
>     at org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.onResult(GridPartitionedSingleGetFuture.java:536)
>     at org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter.processNearSingleGetResponse(GridDhtCacheAdapter.java:349)
>     at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$1400(GridDhtAtomicCache.java:130)
>     at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$15.apply(GridDhtAtomicCache.java:422)
>     at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$15.apply(GridDhtAtomicCache.java:417)
>     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1060)
>     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:579)
>     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:378)
>     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:304)
>     at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:99)
>     at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:293)
>     at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
>     at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
>     at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
>     at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
>     at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
>     at java.lang.Thread.run(Thread.java:748)
> 
> Hope you can help me shed some light on this.
> 
> Thanks,
> Luca
> 
> 
> 2018-03-23 14:33 GMT+01:00 Nikolay Izhikov <ni...@apache.org>:
> > Hello, Luca.
> > 
> > Can you attach some simple reproducer or code piece that cause exception?
> > 
> > В Пт, 23/03/2018 в 14:31 +0100, Rosellini, Luca пишет:
> > > Hi all,
> > > I am using Apache Ignite 2.4 and I've successfully saved a Spark Dataframe as a SQL table in the Ignite caching layer.
> > >
> > > I am trying to access the data from an external Java program (completely unrelated to the Spark Job that produced and saved the table) using the Cache API, as if it were a key/value store.
> > >
> > > The table, called 'PERSON', has a primary key field called UUID and maps to an Ignite cache called SQL_PUBLIC_PERSON.
> > >
> > > Using the Ignite Cache API I am able to check that that a specific entry exists in the cache calling:
> > > cache.containsKey(...)
> > >
> > > By the way, If I try to get the value calling cache.get(...) for a specific key I get a ClassNotFoundException (full stacktrace is attached).
> > >
> > > Now, I guess Ignite dinamically generated a schema bean for my DataFrame when saving the DataFrame itself in Spark.
> > > Since the generated bean class name also seems to be generated whith some internal rule (in this example it's 'SQL_PUBLIC_PERSON_da18b6a2_8b41_4c34_9451_6fd9ace8e73d') I am not sure if this usage pattern does make sense at all.
> > >
> > > I am very new to Apache Ignite so I'd like to apologize if this is a silly question, but I am not able to find any clue in the official documentation.
> > >
> > > Thanks,
> > > Luca
> 
> 

Re: Saving a DataFrame from Spark and accessing data as key/value externally

Posted by "Rosellini, Luca" <lr...@keedio.com>.
Hi Nikolay,
let's say I have the following ignite config (it's the default config with
persistent store enabled):

$ cat
/Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="dataStorageConfiguration">
        <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
            <property name="defaultDataRegionConfiguration">
                <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
                    <property name="persistenceEnabled" value="true">
                </property></bean>
            </property>
        </bean>
        </property>
    </bean>
</beans>

I have the following json data file :

{"uuid": "56734043-912F-44AF-9AB0-929482ECCC60","name":
"Leonardo","surname": "Da Vinci"}
{"uuid": "C2A9BCD3-FE93-4E44-98D9-043086A154A3","name":
"Raffaello","surname": "Sanzio"}

Using the following Spark code I load it as a DataFrame:

final class DataFrameWriteTest extends FunSuite {

  private lazy val spark = SparkSession.builder()
    .appName("Example Program")
    .master("local[1]")
  .getOrCreate()

  test("Test write dataframe") {
    val cfg = "/Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml"

    val df = spark.read.json("/Users/luca/projects/keedio/spark-sbt-skel/src/test/resources/data.json")

    df.printSchema()
    df.write
      .format(FORMAT_IGNITE)
      .option(OPTION_CONFIG_FILE, cfg)
      .option(OPTION_TABLE, "PERSON")
      .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "uuid")
      .mode(SaveMode.Append)
      .save()

  }
}

The dataframe is successfully saved in an Ignite cache called
SQL_PUBLIC_PERSON, and I can successfully query it via JDBC, using DBeaver,
for example:


So, now, I'd like to acces the cache using Ignite key/value abstraction,
using the following Java code:

public class JIgniteCacheTest {
  @Test
  public void testIgniteCaches() {
    Ignite ignite =
Ignition.start("/Users/luca/java/apache-ignite-fabric-2.4.0-bin/config/default-config.xml");

    IgniteCache<String, Object> cache =
ignite.getOrCreateCache("SQL_PUBLIC_PERSON");

    // OK
    System.out.println(cache.containsKey("C2A9BCD3-FE93-4E44-98D9-043086A154A3"));

    // throws exception
    System.out.println(cache.get("C2A9BCD3-FE93-4E44-98D9-043086A154A3"));
  }

As you can see, I am pointing to the same Ignite configuration.
 The call cache.containsKey(...) returns successfully, while the call to
cache.get(...) throws the following exception:
javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException:
SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc

    at
org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1294)
    at
org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.cacheException(IgniteCacheProxyImpl.java:1673)
    at
org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.get(IgniteCacheProxyImpl.java:852)
    at
org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.get(GatewayProtectedCacheProxy.java:676)
    at
com.keedio.ignite.JIgniteCacheTest.testIgniteCaches(JIgniteCacheTest.java:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: class org.apache.ignite.IgniteCheckedException:
SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
    at
org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7244)
    at
org.apache.ignite.internal.util.future.GridFutureAdapter.resolve(GridFutureAdapter.java:259)
    at
org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:190)
    at
org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:140)
    at
org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4572)
    at
org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4546)
    at
org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1347)
    at
org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.get(IgniteCacheProxyImpl.java:849)
    ... 24 more
Caused by: java.lang.ClassNotFoundException:
SQL_PUBLIC_PERSON_eafe5efa_a96b_4f76_a72b_7b9e4c2d3fcc
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8548)
    at
org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:340)
    at
org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:687)
    at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1755)
    at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1714)
    at
org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:797)
    at
org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:143)
    at
org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:177)
    at
org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:67)
    at
org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:125)
    at
org.apache.ignite.internal.processors.cache.GridCacheContext.unwrapBinaryIfNeeded(GridCacheContext.java:1745)
    at
org.apache.ignite.internal.processors.cache.GridCacheContext.unwrapBinaryIfNeeded(GridCacheContext.java:1733)
    at
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.setResult(GridPartitionedSingleGetFuture.java:679)
    at
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.onResult(GridPartitionedSingleGetFuture.java:536)
    at
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter.processNearSingleGetResponse(GridDhtCacheAdapter.java:349)
    at
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$1400(GridDhtAtomicCache.java:130)
    at
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$15.apply(GridDhtAtomicCache.java:422)
    at
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$15.apply(GridDhtAtomicCache.java:417)
    at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1060)
    at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:579)
    at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:378)
    at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:304)
    at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:99)
    at
org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:293)
    at
org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at
org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at
org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at
org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at
org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:748)

Hope you can help me shed some light on this.

Thanks,
Luca


2018-03-23 14:33 GMT+01:00 Nikolay Izhikov <ni...@apache.org>:

> Hello, Luca.
>
> Can you attach some simple reproducer or code piece that cause exception?
>
> В Пт, 23/03/2018 в 14:31 +0100, Rosellini, Luca пишет:
> > Hi all,
> > I am using Apache Ignite 2.4 and I've successfully saved a Spark
> Dataframe as a SQL table in the Ignite caching layer.
> >
> > I am trying to access the data from an external Java program (completely
> unrelated to the Spark Job that produced and saved the table) using the
> Cache API, as if it were a key/value store.
> >
> > The table, called 'PERSON', has a primary key field called UUID and maps
> to an Ignite cache called SQL_PUBLIC_PERSON.
> >
> > Using the Ignite Cache API I am able to check that that a specific entry
> exists in the cache calling:
> > cache.containsKey(...)
> >
> > By the way, If I try to get the value calling cache.get(...) for a
> specific key I get a ClassNotFoundException (full stacktrace is attached).
> >
> > Now, I guess Ignite dinamically generated a schema bean for my DataFrame
> when saving the DataFrame itself in Spark.
> > Since the generated bean class name also seems to be generated whith
> some internal rule (in this example it's 'SQL_PUBLIC_PERSON_da18b6a2_8b41_4c34_9451_6fd9ace8e73d')
> I am not sure if this usage pattern does make sense at all.
> >
> > I am very new to Apache Ignite so I'd like to apologize if this is a
> silly question, but I am not able to find any clue in the official
> documentation.
> >
> > Thanks,
> > Luca
>

Re: Saving a DataFrame from Spark and accessing data as key/value externally

Posted by Nikolay Izhikov <ni...@apache.org>.
Hello, Luca.

Can you attach some simple reproducer or code piece that cause exception?

В Пт, 23/03/2018 в 14:31 +0100, Rosellini, Luca пишет:
> Hi all,
> I am using Apache Ignite 2.4 and I've successfully saved a Spark Dataframe as a SQL table in the Ignite caching layer.
> 
> I am trying to access the data from an external Java program (completely unrelated to the Spark Job that produced and saved the table) using the Cache API, as if it were a key/value store.
> 
> The table, called 'PERSON', has a primary key field called UUID and maps to an Ignite cache called SQL_PUBLIC_PERSON.
> 
> Using the Ignite Cache API I am able to check that that a specific entry exists in the cache calling:
> cache.containsKey(...)
> 
> By the way, If I try to get the value calling cache.get(...) for a specific key I get a ClassNotFoundException (full stacktrace is attached).
> 
> Now, I guess Ignite dinamically generated a schema bean for my DataFrame when saving the DataFrame itself in Spark. 
> Since the generated bean class name also seems to be generated whith some internal rule (in this example it's 'SQL_PUBLIC_PERSON_da18b6a2_8b41_4c34_9451_6fd9ace8e73d') I am not sure if this usage pattern does make sense at all.
> 
> I am very new to Apache Ignite so I'd like to apologize if this is a silly question, but I am not able to find any clue in the official documentation.
> 
> Thanks,
> Luca