You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Harshal Patil <ha...@mindtickle.com> on 2019/03/25 14:17:47 UTC

Spark dataframe to Ignite write issue .

Hi ,
I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
cachePersistance store . After loading of cache , i can read and convert
data from ignite cache to Spark Dataframe . But while writing back to
ignite , I get below error

class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
"ENTITY_PLAYABLE" not found*; SQL statement:

INSERT INTO
ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
VALUES(?,?,?,?,?,?,?,?) [42102-197]

at
*org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
(IgniteH2Indexing.java:1302)

at
org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)

at
org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)

at
org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)



*Read from Ignite* :


loading cache


val conf = new SparkConf()
    conf.setMaster("spark://harshal-patil.local:7077")
//        conf.setMaster("local[*]")
    conf.setAppName("IGniteTest")
    conf.set("spark.executor.heartbeatInterval", "900s")
    conf.set("spark.network.timeout", "950s")
    conf.set("spark.default.parallelism", "4")
    conf.set("spark.cores.max", "4")
    conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")

val cfg = () => ServerConfigurationFactory.createConfiguration()

Ignition.start(ServerConfigurationFactory.createConfiguration())

val ic : IgniteContext = new IgniteContext(sc,  cfg)

ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_,
_]])




*spark.read*

  .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)

  .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)

  .option(IgniteDataFrameSettings.*OPTION_TABLE*,
"ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"), *count*
("gameId").as("total")).collect()(0)


*Write To Ignite* :


*df.write*

  .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)

  .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)


  .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")

    .option(IgniteDataFrameSettings.*OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
"gameId,playableId,companyId,version")

    .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
"true")

  .mode(SaveMode.*Append*)

  .save()



I think the problem is with *Spring bean Injection on executer node* ,
please help , what i am doing wrong .

Re: Spark dataframe to Ignite write issue .

Posted by Harshal Patil <ha...@mindtickle.com>.
Can please anyone help here ?

On Mon, Mar 25, 2019, 7:47 PM Harshal Patil <ha...@mindtickle.com>
wrote:

> Hi ,
> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
> cachePersistance store . After loading of cache , i can read and convert
> data from ignite cache to Spark Dataframe . But while writing back to
> ignite , I get below error
>
> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
> "ENTITY_PLAYABLE" not found*; SQL statement:
>
> INSERT INTO
> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>
> at
> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
> (IgniteH2Indexing.java:1302)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>
> at
> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>
>
>
> *Read from Ignite* :
>
>
> loading cache
>
>
> val conf = new SparkConf()
>     conf.setMaster("spark://harshal-patil.local:7077")
> //        conf.setMaster("local[*]")
>     conf.setAppName("IGniteTest")
>     conf.set("spark.executor.heartbeatInterval", "900s")
>     conf.set("spark.network.timeout", "950s")
>     conf.set("spark.default.parallelism", "4")
>     conf.set("spark.cores.max", "4")
>     conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>
> val cfg = () => ServerConfigurationFactory.createConfiguration()
>
> Ignition.start(ServerConfigurationFactory.createConfiguration())
>
> val ic : IgniteContext = new IgniteContext(sc,  cfg)
>
> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>
>
>
>
> *spark.read*
>
>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>   .option(IgniteDataFrameSettings.*OPTION_TABLE*,
> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
> *count*("gameId").as("total")).collect()(0)
>
>
> *Write To Ignite* :
>
>
> *df.write*
>
>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>
>   .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>
>     .option(IgniteDataFrameSettings.
> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
> "gameId,playableId,companyId,version")
>
>     .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
> "true")
>
>   .mode(SaveMode.*Append*)
>
>   .save()
>
>
>
> I think the problem is with *Spring bean Injection on executer node* ,
> please help , what i am doing wrong .
>
>
>
>

Re: Spark dataframe to Ignite write issue .

Posted by Harshal Patil <ha...@mindtickle.com>.
Hi Nikolay , Denis

If i am disable option of streamer_overwrite , then it works fine .

df1.write
          .format(IgniteDataFrameSettings.FORMAT_IGNITE)
          .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, configPath)

          .option(IgniteDataFrameSettings.OPTION_TABLE, "ENTITY_PLAYABLE")
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS,
"gameId,playableId,companyId,version")
//            .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE,
"true")
          .mode(SaveMode.Append)
          .save()


but , after enabling "
IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE"  i get before
mentioned errors intermittently , i guess it may be due to
https://stackoverflow.com/questions/5763747/h2-in-memory-database-table-not-found
.

One more thing , if I create IgniteContext like

val configPath =
"/Users/harshal/Downloads/Ignite23-project/src/main/resources/META-INF/Ignite23-server.xml"
val ic : IgniteContext = new IgniteContext(sc, configPath)

like above , i am not able to inject dependencies , so I am doing

public static IgniteConfiguration createConfiguration() throws Exception {
  IgniteConfiguration cfg = new IgniteConfiguration();
  cfg.setIgniteInstanceName("Ignite23");
  TcpDiscoverySpi discovery = new TcpDiscoverySpi();
  TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
  ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47510"));
  discovery.setIpFinder(ipFinder);
  cfg.setDiscoverySpi(discovery);
  cfg.setCacheConfiguration(new
CacheConfiguration[]{cacheEntityPlayableCache()});
  return cfg;
}

public static CacheConfiguration cacheEntityPlayableCache() throws Exception {
  CacheConfiguration ccfg = new CacheConfiguration();
  ccfg.setName("EntityPlayableCache");
  ccfg.setCacheMode(CacheMode.PARTITIONED);
  ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
  CacheJdbcPojoStoreFactory cacheStoreFactory = new CacheJdbcPojoStoreFactory();
  cacheStoreFactory.setDataSourceFactory(new Factory<DataSource>() {
    public DataSource create() {
      return ServerConfigurationFactory.DataSources.INSTANCE_dsPostgreSQL_Rein;
    }
  });
  cacheStoreFactory.setDialect(new BasicJdbcDialect());
  cacheStoreFactory.setTypes(new
JdbcType[]{jdbcTypeEntityPlayable(ccfg.getName())});
  cacheStoreFactory.setSqlEscapeAll(true);
  ccfg.setCacheStoreFactory(cacheStoreFactory);
  ccfg.setReadThrough(true);
  ccfg.setWriteThrough(true);
  ArrayList<QueryEntity> qryEntities = new ArrayList();
  QueryEntity qryEntity = new QueryEntity();
  qryEntity.setKeyType("com.gmail.patil.j.harshal.model.EntityPlayableKey");
  qryEntity.setValueType("com.gmail.patil.j.harshal.model.EntityPlayable");
  qryEntity.setTableName("entity_playable");
  HashSet<String> keyFields = new HashSet();
  keyFields.add("gameId");
  keyFields.add("playableid");
  keyFields.add("companyId");
  keyFields.add("version");
  qryEntity.setKeyFields(keyFields);
  LinkedHashMap<String, String> fields = new LinkedHashMap();
  fields.put("gameId", "java.lang.Long");
  fields.put("playableid", "java.lang.Long");
  fields.put("companyId", "java.lang.Long");
  fields.put("version", "java.lang.Integer");
  fields.put("eventTimestamp", "java.sql.Timestamp");
  fields.put("eventTimestampSys", "java.lang.Long");
  fields.put("companyIdPartition", "java.lang.Long");
  fields.put("partitionkey", "java.lang.Long");
  qryEntity.setFields(fields);
  ArrayList<QueryIndex> indexes = new ArrayList();
  QueryIndex index = new QueryIndex();
  index.setName("company_id_partition_hash_entity_playable_hash");
  index.setIndexType(QueryIndexType.SORTED);
  LinkedHashMap<String, Boolean> indFlds = new LinkedHashMap();
  indFlds.put("companyIdPartition", false);
  index.setFields(indFlds);
  indexes.add(index);
  index = new QueryIndex();
  index.setName("companyId_entity_playable_hash");
  index.setIndexType(QueryIndexType.SORTED);
  indFlds = new LinkedHashMap();
  indFlds.put("companyId", false);
  index.setFields(indFlds);
  indexes.add(index);
  index = new QueryIndex();
  index.setName("gameId_entity_playable_hash");
  index.setIndexType(QueryIndexType.SORTED);
  indFlds = new LinkedHashMap();
  indFlds.put("gameId", false);
  index.setFields(indFlds);
  indexes.add(index);
  index = new QueryIndex();
  index.setName("company_id_partition_entity_playable_normal");
  index.setIndexType(QueryIndexType.SORTED);
  indFlds = new LinkedHashMap();
  indFlds.put("companyIdPartition", false);
  index.setFields(indFlds);
  indexes.add(index);
  index = new QueryIndex();
  index.setName("companyId_entity_playable_normal");
  index.setIndexType(QueryIndexType.SORTED);
  indFlds = new LinkedHashMap();
  indFlds.put("companyId", false);
  index.setFields(indFlds);
  indexes.add(index);
  index = new QueryIndex();
  index.setName("gameId_entity_playable_normal");
  index.setIndexType(QueryIndexType.SORTED);
  indFlds = new LinkedHashMap();
  indFlds.put("gameId", false);
  index.setFields(indFlds);
  indexes.add(index);
  qryEntity.setIndexes(indexes);
  qryEntities.add(qryEntity);
  ccfg.setQueryEntities(qryEntities);
  return ccfg;
}

val cfg = () => ServerConfigurationFactory.createConfiguration()

Ignition.start(cfg())

val ic : IgniteContext = new IgniteContext(sc,  cfg)

which can inject cacheConfigurations .

<?xml version="1.0" encoding="UTF-8"?>

<!-- This file was generated by Ignite Web Console (03/19/2019, 23:43) -->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/util

http://www.springframework.org/schema/util/spring-util.xsd">
   <!-- Load external properties file. -->
   <!--<bean id="placeholderConfig"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">-->
   <!--<property name="location" value="classpath:secret.properties"/>-->
   <!--</bean>-->

   <bean id="dsPostgreSQL_Rein" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="url"
value="jdbc:postgresql://analyticstrack.caumccqvmegm.ap-southeast-1.rds.amazonaws.com:5432/rein"/>
      <property name="user" value="postgres"/>
      <property name="password" value="postgres"/>
   </bean>

   <bean class="org.apache.ignite.configuration.IgniteConfiguration">
      <property name="igniteInstanceName" value="Ignite23"/>

      <property name="discoverySpi">
         <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
               <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                  <property name="addresses">
                     <list>
                        <value>127.0.0.1:47500..47510</value>
                     </list>
                  </property>
               </bean>
            </property>
         </bean>
      </property>

      <property name="cacheConfiguration">
         <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
               <property name="name" value="EntityPlayableCache"/>
               <property name="cacheMode" value="PARTITIONED"/>
               <property name="atomicityMode" value="ATOMIC"/>

               <property name="cacheStoreFactory">
                  <bean
class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
                     <property name="dataSourceBean" value="dsPostgreSQL_Rein"/>
                     <property name="dialect">
                        <bean
class="org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect">
                        </bean>
                     </property>
                     <property name="sqlEscapeAll" value="true"></property>
                     <property name="batchSize" value="1000"/>

                     <property name="types">
                        <list>
                           <bean
class="org.apache.ignite.cache.store.jdbc.JdbcType">
                              <property name="cacheName"
value="EntityPlayableCache"/>
                              <property name="keyType"
value="com.gmail.patil.j.harshal.model.EntityPlayableKey"/>
                              <property name="valueType"
value="com.gmail.patil.j.harshal.model.EntityPlayable"/>
                              <property name="databaseSchema" value="public"/>
                              <property name="databaseTable"
value="entity_playable"/>

                              <property name="keyFields">
                                 <list>
                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.BIGINT"/>
                                       </constructor-arg>
                                       <constructor-arg value="gameId"/>
                                       <constructor-arg value="long"/>
                                       <constructor-arg value="gameId"/>
                                    </bean>

                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.BIGINT"/>
                                       </constructor-arg>
                                       <constructor-arg value="playableId"/>
                                       <constructor-arg value="long"/>
                                       <constructor-arg value="playableId"/>
                                    </bean>

                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.BIGINT"/>
                                       </constructor-arg>
                                       <constructor-arg value="companyId"/>
                                       <constructor-arg value="long"/>
                                       <constructor-arg value="companyId"/>
                                    </bean>

                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.INTEGER"/>
                                       </constructor-arg>
                                       <constructor-arg value="version"/>
                                       <constructor-arg value="int"/>
                                       <constructor-arg value="version"/>
                                    </bean>
                                 </list>
                              </property>

                              <property name="valueFields">
                                 <list>
                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.TIMESTAMP"/>
                                       </constructor-arg>
                                       <constructor-arg value="eventTimestamp"/>
                                       <constructor-arg
value="java.sql.Timestamp"/>
                                       <constructor-arg value="eventTimestamp"/>
                                    </bean>

                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.BIGINT"/>
                                       </constructor-arg>
                                       <constructor-arg
value="eventTimestampSys"/>
                                       <constructor-arg value="java.lang.Long"/>
                                       <constructor-arg
value="eventTimestampSys"/>
                                    </bean>

                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.BIGINT"/>
                                       </constructor-arg>
                                       <constructor-arg
value="companyIdPartition"/>
                                       <constructor-arg value="java.lang.Long"/>
                                       <constructor-arg
value="companyIdPartition"/>
                                    </bean>

                                    <bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                       <constructor-arg>
                                          <util:constant
static-field="java.sql.Types.BIGINT"/>
                                       </constructor-arg>
                                       <constructor-arg value="partitionkey"/>
                                       <constructor-arg value="java.lang.Long"/>
                                       <constructor-arg value="partitionkey"/>
                                    </bean>
                                 </list>
                              </property>
                           </bean>
                        </list>
                     </property>
                  </bean>
               </property>

               <property name="readThrough" value="true"/>
               <property name="writeBehindEnabled" value="true"/>
               <property name="writeBehindBatchSize" value="1000"/>
               <property name="writeBehindFlushSize" value="0"/>

               <property name="queryEntities">
                  <list>
                     <bean class="org.apache.ignite.cache.QueryEntity">
                        <property name="keyType"
value="com.gmail.patil.j.harshal.model.EntityPlayableKey"/>
                        <property name="valueType"
value="com.gmail.patil.j.harshal.model.EntityPlayable"/>
                        <property name="tableName" value="entity_playable"/>

                        <property name="keyFields">
                           <list>
                              <value>gameId</value>
                              <value>playableId</value>
                              <value>companyId</value>
                              <value>version</value>
                           </list>
                        </property>

                        <property name="fields">
                           <map>
                              <entry key="gameId" value="java.lang.Long"/>
                              <entry key="playableId" value="java.lang.Long"/>
                              <entry key="companyId" value="java.lang.Long"/>
                              <entry key="version" value="java.lang.Integer"/>
                              <entry key="eventTimestamp"
value="java.sql.Timestamp"/>
                              <entry key="eventTimestampSys"
value="java.lang.Long"/>
                              <entry key="companyIdPartition"
value="java.lang.Long"/>
                              <entry key="partitionkey" value="java.lang.Long"/>
                           </map>
                        </property>

                        <property name="indexes">
                           <list>
                              <bean class="org.apache.ignite.cache.QueryIndex">
                                 <property name="name"
value="company_id_partition_hash_entity_playable_hash"/>
                                 <property name="indexType" value="SORTED"/>

                                 <property name="fields">
                                    <map>
                                       <entry key="companyIdPartition"
value="false"/>
                                    </map>
                                 </property>
                              </bean>

                              <bean class="org.apache.ignite.cache.QueryIndex">
                                 <property name="name"
value="companyId_entity_playable_hash"/>
                                 <property name="indexType" value="SORTED"/>

                                 <property name="fields">
                                    <map>
                                       <entry key="companyId" value="false"/>
                                    </map>
                                 </property>
                              </bean>

                              <bean class="org.apache.ignite.cache.QueryIndex">
                                 <property name="name"
value="gameId_entity_playable_hash"/>
                                 <property name="indexType" value="SORTED"/>

                                 <property name="fields">
                                    <map>
                                       <entry key="gameId" value="false"/>
                                    </map>
                                 </property>
                              </bean>

                              <bean class="org.apache.ignite.cache.QueryIndex">
                                 <property name="name"
value="company_id_partition_entity_playable_normal"/>
                                 <property name="indexType" value="SORTED"/>

                                 <property name="fields">
                                    <map>
                                       <entry key="companyIdPartition"
value="false"/>
                                    </map>
                                 </property>
                              </bean>

                              <bean class="org.apache.ignite.cache.QueryIndex">
                                 <property name="name"
value="companyId_entity_playable_normal"/>
                                 <property name="indexType" value="SORTED"/>

                                 <property name="fields">
                                    <map>
                                       <entry key="companyId" value="false"/>
                                    </map>
                                 </property>
                              </bean>

                              <bean class="org.apache.ignite.cache.QueryIndex">
                                 <property name="name"
value="gameId_entity_playable_normal"/>
                                 <property name="indexType" value="SORTED"/>

                                 <property name="fields">
                                    <map>
                                       <entry key="gameId" value="false"/>
                                    </map>
                                 </property>
                              </bean>
                           </list>
                        </property>
                     </bean>
                  </list>
               </property>
            </bean>
         </list>
      </property>
   </bean>


and i have xml file  , can you suggest here , is the issue because I am
using xml file while writingdataframe but i have loaded cache from
IgniteContext in which I have injected cacheConfiguration through function
call .


On Tue, Mar 26, 2019 at 4:05 PM Nikolay Izhikov <ni...@apache.org> wrote:

> Hello, Harshal
>
> Can you, please, share your Ignite config?
> Especially, "*ENTITY_PLAYABLE*" cache definition
>
> вт, 26 мар. 2019 г. в 05:35, Denis Magda <dm...@apache.org>:
>
>> Hi, as far as I can guess from the shared details, you should pass the
>> IgniteCache name as a SQL schema if SQL metadata was configured via XML or
>> annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE".
>>
>> -
>> Denis
>>
>>
>> On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil <
>> harshal.patil@mindtickle.com> wrote:
>>
>>> Hi ,
>>> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres
>>> as cachePersistance store . After loading of cache , i can read and convert
>>> data from ignite cache to Spark Dataframe . But while writing back to
>>> ignite , I get below error
>>>
>>> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
>>> "ENTITY_PLAYABLE" not found*; SQL statement:
>>>
>>> INSERT INTO
>>> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
>>> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>>>
>>> at
>>> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
>>> (IgniteH2Indexing.java:1302)
>>>
>>> at
>>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>>>
>>> at
>>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>>>
>>> at
>>> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>>>
>>>
>>>
>>> *Read from Ignite* :
>>>
>>>
>>> loading cache
>>>
>>>
>>> val conf = new SparkConf()
>>>     conf.setMaster("spark://harshal-patil.local:7077")
>>> //        conf.setMaster("local[*]")
>>>     conf.setAppName("IGniteTest")
>>>     conf.set("spark.executor.heartbeatInterval", "900s")
>>>     conf.set("spark.network.timeout", "950s")
>>>     conf.set("spark.default.parallelism", "4")
>>>     conf.set("spark.cores.max", "4")
>>>     conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>>>
>>> val cfg = () => ServerConfigurationFactory.createConfiguration()
>>>
>>> Ignition.start(ServerConfigurationFactory.createConfiguration())
>>>
>>> val ic : IgniteContext = new IgniteContext(sc,  cfg)
>>>
>>> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>>>
>>>
>>>
>>>
>>> *spark.read*
>>>
>>>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>>
>>>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>>
>>>   .option(IgniteDataFrameSettings.*OPTION_TABLE*,
>>> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
>>> *count*("gameId").as("total")).collect()(0)
>>>
>>>
>>> *Write To Ignite* :
>>>
>>>
>>> *df.write*
>>>
>>>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>>
>>>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>>
>>>
>>>   .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>>>
>>>     .option(IgniteDataFrameSettings.
>>> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
>>> "gameId,playableId,companyId,version")
>>>
>>>     .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
>>> "true")
>>>
>>>   .mode(SaveMode.*Append*)
>>>
>>>   .save()
>>>
>>>
>>>
>>> I think the problem is with *Spring bean Injection on executer node* ,
>>> please help , what i am doing wrong .
>>>
>>>
>>>
>>>

Re: Spark dataframe to Ignite write issue .

Posted by Nikolay Izhikov <ni...@apache.org>.
Hello, Harshal

Can you, please, share your Ignite config?
Especially, "*ENTITY_PLAYABLE*" cache definition

вт, 26 мар. 2019 г. в 05:35, Denis Magda <dm...@apache.org>:

> Hi, as far as I can guess from the shared details, you should pass the
> IgniteCache name as a SQL schema if SQL metadata was configured via XML or
> annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE".
>
> -
> Denis
>
>
> On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil <
> harshal.patil@mindtickle.com> wrote:
>
>> Hi ,
>> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres
>> as cachePersistance store . After loading of cache , i can read and convert
>> data from ignite cache to Spark Dataframe . But while writing back to
>> ignite , I get below error
>>
>> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
>> "ENTITY_PLAYABLE" not found*; SQL statement:
>>
>> INSERT INTO
>> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
>> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>>
>> at
>> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
>> (IgniteH2Indexing.java:1302)
>>
>> at
>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>>
>> at
>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>>
>> at
>> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>>
>>
>>
>> *Read from Ignite* :
>>
>>
>> loading cache
>>
>>
>> val conf = new SparkConf()
>>     conf.setMaster("spark://harshal-patil.local:7077")
>> //        conf.setMaster("local[*]")
>>     conf.setAppName("IGniteTest")
>>     conf.set("spark.executor.heartbeatInterval", "900s")
>>     conf.set("spark.network.timeout", "950s")
>>     conf.set("spark.default.parallelism", "4")
>>     conf.set("spark.cores.max", "4")
>>     conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>>
>> val cfg = () => ServerConfigurationFactory.createConfiguration()
>>
>> Ignition.start(ServerConfigurationFactory.createConfiguration())
>>
>> val ic : IgniteContext = new IgniteContext(sc,  cfg)
>>
>> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>>
>>
>>
>>
>> *spark.read*
>>
>>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>
>>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>
>>   .option(IgniteDataFrameSettings.*OPTION_TABLE*,
>> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
>> *count*("gameId").as("total")).collect()(0)
>>
>>
>> *Write To Ignite* :
>>
>>
>> *df.write*
>>
>>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>
>>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>
>>
>>   .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>>
>>     .option(IgniteDataFrameSettings.
>> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
>> "gameId,playableId,companyId,version")
>>
>>     .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
>> "true")
>>
>>   .mode(SaveMode.*Append*)
>>
>>   .save()
>>
>>
>>
>> I think the problem is with *Spring bean Injection on executer node* ,
>> please help , what i am doing wrong .
>>
>>
>>
>>

Re: Spark dataframe to Ignite write issue .

Posted by Denis Magda <dm...@apache.org>.
Hi, as far as I can guess from the shared details, you should pass the
IgniteCache name as a SQL schema if SQL metadata was configured via XML or
annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE".

-
Denis


On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil <ha...@mindtickle.com>
wrote:

> Hi ,
> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
> cachePersistance store . After loading of cache , i can read and convert
> data from ignite cache to Spark Dataframe . But while writing back to
> ignite , I get below error
>
> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
> "ENTITY_PLAYABLE" not found*; SQL statement:
>
> INSERT INTO
> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>
> at
> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
> (IgniteH2Indexing.java:1302)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>
> at
> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>
>
>
> *Read from Ignite* :
>
>
> loading cache
>
>
> val conf = new SparkConf()
>     conf.setMaster("spark://harshal-patil.local:7077")
> //        conf.setMaster("local[*]")
>     conf.setAppName("IGniteTest")
>     conf.set("spark.executor.heartbeatInterval", "900s")
>     conf.set("spark.network.timeout", "950s")
>     conf.set("spark.default.parallelism", "4")
>     conf.set("spark.cores.max", "4")
>     conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>
> val cfg = () => ServerConfigurationFactory.createConfiguration()
>
> Ignition.start(ServerConfigurationFactory.createConfiguration())
>
> val ic : IgniteContext = new IgniteContext(sc,  cfg)
>
> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>
>
>
>
> *spark.read*
>
>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>   .option(IgniteDataFrameSettings.*OPTION_TABLE*,
> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
> *count*("gameId").as("total")).collect()(0)
>
>
> *Write To Ignite* :
>
>
> *df.write*
>
>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>
>   .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>
>     .option(IgniteDataFrameSettings.
> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
> "gameId,playableId,companyId,version")
>
>     .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
> "true")
>
>   .mode(SaveMode.*Append*)
>
>   .save()
>
>
>
> I think the problem is with *Spring bean Injection on executer node* ,
> please help , what i am doing wrong .
>
>
>
>