You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Harshal Patil <ha...@mindtickle.com> on 2019/03/25 14:17:47 UTC
Spark dataframe to Ignite write issue .
Hi ,
I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
cachePersistance store . After loading of cache , i can read and convert
data from ignite cache to Spark Dataframe . But while writing back to
ignite , I get below error
class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
"ENTITY_PLAYABLE" not found*; SQL statement:
INSERT INTO
ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
VALUES(?,?,?,?,?,?,?,?) [42102-197]
at
*org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
(IgniteH2Indexing.java:1302)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
at
org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
*Read from Ignite* :
loading cache
val conf = new SparkConf()
conf.setMaster("spark://harshal-patil.local:7077")
// conf.setMaster("local[*]")
conf.setAppName("IGniteTest")
conf.set("spark.executor.heartbeatInterval", "900s")
conf.set("spark.network.timeout", "950s")
conf.set("spark.default.parallelism", "4")
conf.set("spark.cores.max", "4")
conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
val cfg = () => ServerConfigurationFactory.createConfiguration()
Ignition.start(ServerConfigurationFactory.createConfiguration())
val ic : IgniteContext = new IgniteContext(sc, cfg)
ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_,
_]])
*spark.read*
.format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
.option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
.option(IgniteDataFrameSettings.*OPTION_TABLE*,
"ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"), *count*
("gameId").as("total")).collect()(0)
*Write To Ignite* :
*df.write*
.format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
.option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
.option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
.option(IgniteDataFrameSettings.*OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
"gameId,playableId,companyId,version")
.option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
"true")
.mode(SaveMode.*Append*)
.save()
I think the problem is with *Spring bean Injection on executer node* ,
please help , what i am doing wrong .
Re: Spark dataframe to Ignite write issue .
Posted by Harshal Patil <ha...@mindtickle.com>.
Can please anyone help here ?
On Mon, Mar 25, 2019, 7:47 PM Harshal Patil <ha...@mindtickle.com>
wrote:
> Hi ,
> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
> cachePersistance store . After loading of cache , i can read and convert
> data from ignite cache to Spark Dataframe . But while writing back to
> ignite , I get below error
>
> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
> "ENTITY_PLAYABLE" not found*; SQL statement:
>
> INSERT INTO
> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>
> at
> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
> (IgniteH2Indexing.java:1302)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>
> at
> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>
>
>
> *Read from Ignite* :
>
>
> loading cache
>
>
> val conf = new SparkConf()
> conf.setMaster("spark://harshal-patil.local:7077")
> // conf.setMaster("local[*]")
> conf.setAppName("IGniteTest")
> conf.set("spark.executor.heartbeatInterval", "900s")
> conf.set("spark.network.timeout", "950s")
> conf.set("spark.default.parallelism", "4")
> conf.set("spark.cores.max", "4")
> conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>
> val cfg = () => ServerConfigurationFactory.createConfiguration()
>
> Ignition.start(ServerConfigurationFactory.createConfiguration())
>
> val ic : IgniteContext = new IgniteContext(sc, cfg)
>
> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>
>
>
>
> *spark.read*
>
> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
> .option(IgniteDataFrameSettings.*OPTION_TABLE*,
> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
> *count*("gameId").as("total")).collect()(0)
>
>
> *Write To Ignite* :
>
>
> *df.write*
>
> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>
> .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>
> .option(IgniteDataFrameSettings.
> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
> "gameId,playableId,companyId,version")
>
> .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
> "true")
>
> .mode(SaveMode.*Append*)
>
> .save()
>
>
>
> I think the problem is with *Spring bean Injection on executer node* ,
> please help , what i am doing wrong .
>
>
>
>
Re: Spark dataframe to Ignite write issue .
Posted by Harshal Patil <ha...@mindtickle.com>.
Hi Nikolay , Denis
If i am disable option of streamer_overwrite , then it works fine .
df1.write
.format(IgniteDataFrameSettings.FORMAT_IGNITE)
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, configPath)
.option(IgniteDataFrameSettings.OPTION_TABLE, "ENTITY_PLAYABLE")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS,
"gameId,playableId,companyId,version")
// .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE,
"true")
.mode(SaveMode.Append)
.save()
but , after enabling "
IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE" i get before
mentioned errors intermittently , i guess it may be due to
https://stackoverflow.com/questions/5763747/h2-in-memory-database-table-not-found
.
One more thing , if I create IgniteContext like
val configPath =
"/Users/harshal/Downloads/Ignite23-project/src/main/resources/META-INF/Ignite23-server.xml"
val ic : IgniteContext = new IgniteContext(sc, configPath)
like above , i am not able to inject dependencies , so I am doing
public static IgniteConfiguration createConfiguration() throws Exception {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName("Ignite23");
TcpDiscoverySpi discovery = new TcpDiscoverySpi();
TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47510"));
discovery.setIpFinder(ipFinder);
cfg.setDiscoverySpi(discovery);
cfg.setCacheConfiguration(new
CacheConfiguration[]{cacheEntityPlayableCache()});
return cfg;
}
public static CacheConfiguration cacheEntityPlayableCache() throws Exception {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName("EntityPlayableCache");
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
CacheJdbcPojoStoreFactory cacheStoreFactory = new CacheJdbcPojoStoreFactory();
cacheStoreFactory.setDataSourceFactory(new Factory<DataSource>() {
public DataSource create() {
return ServerConfigurationFactory.DataSources.INSTANCE_dsPostgreSQL_Rein;
}
});
cacheStoreFactory.setDialect(new BasicJdbcDialect());
cacheStoreFactory.setTypes(new
JdbcType[]{jdbcTypeEntityPlayable(ccfg.getName())});
cacheStoreFactory.setSqlEscapeAll(true);
ccfg.setCacheStoreFactory(cacheStoreFactory);
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ArrayList<QueryEntity> qryEntities = new ArrayList();
QueryEntity qryEntity = new QueryEntity();
qryEntity.setKeyType("com.gmail.patil.j.harshal.model.EntityPlayableKey");
qryEntity.setValueType("com.gmail.patil.j.harshal.model.EntityPlayable");
qryEntity.setTableName("entity_playable");
HashSet<String> keyFields = new HashSet();
keyFields.add("gameId");
keyFields.add("playableid");
keyFields.add("companyId");
keyFields.add("version");
qryEntity.setKeyFields(keyFields);
LinkedHashMap<String, String> fields = new LinkedHashMap();
fields.put("gameId", "java.lang.Long");
fields.put("playableid", "java.lang.Long");
fields.put("companyId", "java.lang.Long");
fields.put("version", "java.lang.Integer");
fields.put("eventTimestamp", "java.sql.Timestamp");
fields.put("eventTimestampSys", "java.lang.Long");
fields.put("companyIdPartition", "java.lang.Long");
fields.put("partitionkey", "java.lang.Long");
qryEntity.setFields(fields);
ArrayList<QueryIndex> indexes = new ArrayList();
QueryIndex index = new QueryIndex();
index.setName("company_id_partition_hash_entity_playable_hash");
index.setIndexType(QueryIndexType.SORTED);
LinkedHashMap<String, Boolean> indFlds = new LinkedHashMap();
indFlds.put("companyIdPartition", false);
index.setFields(indFlds);
indexes.add(index);
index = new QueryIndex();
index.setName("companyId_entity_playable_hash");
index.setIndexType(QueryIndexType.SORTED);
indFlds = new LinkedHashMap();
indFlds.put("companyId", false);
index.setFields(indFlds);
indexes.add(index);
index = new QueryIndex();
index.setName("gameId_entity_playable_hash");
index.setIndexType(QueryIndexType.SORTED);
indFlds = new LinkedHashMap();
indFlds.put("gameId", false);
index.setFields(indFlds);
indexes.add(index);
index = new QueryIndex();
index.setName("company_id_partition_entity_playable_normal");
index.setIndexType(QueryIndexType.SORTED);
indFlds = new LinkedHashMap();
indFlds.put("companyIdPartition", false);
index.setFields(indFlds);
indexes.add(index);
index = new QueryIndex();
index.setName("companyId_entity_playable_normal");
index.setIndexType(QueryIndexType.SORTED);
indFlds = new LinkedHashMap();
indFlds.put("companyId", false);
index.setFields(indFlds);
indexes.add(index);
index = new QueryIndex();
index.setName("gameId_entity_playable_normal");
index.setIndexType(QueryIndexType.SORTED);
indFlds = new LinkedHashMap();
indFlds.put("gameId", false);
index.setFields(indFlds);
indexes.add(index);
qryEntity.setIndexes(indexes);
qryEntities.add(qryEntity);
ccfg.setQueryEntities(qryEntities);
return ccfg;
}
val cfg = () => ServerConfigurationFactory.createConfiguration()
Ignition.start(cfg())
val ic : IgniteContext = new IgniteContext(sc, cfg)
which can inject cacheConfigurations .
<?xml version="1.0" encoding="UTF-8"?>
<!-- This file was generated by Ignite Web Console (03/19/2019, 23:43) -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<!-- Load external properties file. -->
<!--<bean id="placeholderConfig"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">-->
<!--<property name="location" value="classpath:secret.properties"/>-->
<!--</bean>-->
<bean id="dsPostgreSQL_Rein" class="org.postgresql.ds.PGPoolingDataSource">
<property name="url"
value="jdbc:postgresql://analyticstrack.caumccqvmegm.ap-southeast-1.rds.amazonaws.com:5432/rein"/>
<property name="user" value="postgres"/>
<property name="password" value="postgres"/>
</bean>
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="Ignite23"/>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47510</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="EntityPlayableCache"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="cacheStoreFactory">
<bean
class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
<property name="dataSourceBean" value="dsPostgreSQL_Rein"/>
<property name="dialect">
<bean
class="org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect">
</bean>
</property>
<property name="sqlEscapeAll" value="true"></property>
<property name="batchSize" value="1000"/>
<property name="types">
<list>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcType">
<property name="cacheName"
value="EntityPlayableCache"/>
<property name="keyType"
value="com.gmail.patil.j.harshal.model.EntityPlayableKey"/>
<property name="valueType"
value="com.gmail.patil.j.harshal.model.EntityPlayable"/>
<property name="databaseSchema" value="public"/>
<property name="databaseTable"
value="entity_playable"/>
<property name="keyFields">
<list>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.BIGINT"/>
</constructor-arg>
<constructor-arg value="gameId"/>
<constructor-arg value="long"/>
<constructor-arg value="gameId"/>
</bean>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.BIGINT"/>
</constructor-arg>
<constructor-arg value="playableId"/>
<constructor-arg value="long"/>
<constructor-arg value="playableId"/>
</bean>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.BIGINT"/>
</constructor-arg>
<constructor-arg value="companyId"/>
<constructor-arg value="long"/>
<constructor-arg value="companyId"/>
</bean>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
<constructor-arg value="version"/>
<constructor-arg value="int"/>
<constructor-arg value="version"/>
</bean>
</list>
</property>
<property name="valueFields">
<list>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.TIMESTAMP"/>
</constructor-arg>
<constructor-arg value="eventTimestamp"/>
<constructor-arg
value="java.sql.Timestamp"/>
<constructor-arg value="eventTimestamp"/>
</bean>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.BIGINT"/>
</constructor-arg>
<constructor-arg
value="eventTimestampSys"/>
<constructor-arg value="java.lang.Long"/>
<constructor-arg
value="eventTimestampSys"/>
</bean>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.BIGINT"/>
</constructor-arg>
<constructor-arg
value="companyIdPartition"/>
<constructor-arg value="java.lang.Long"/>
<constructor-arg
value="companyIdPartition"/>
</bean>
<bean
class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant
static-field="java.sql.Types.BIGINT"/>
</constructor-arg>
<constructor-arg value="partitionkey"/>
<constructor-arg value="java.lang.Long"/>
<constructor-arg value="partitionkey"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeBehindEnabled" value="true"/>
<property name="writeBehindBatchSize" value="1000"/>
<property name="writeBehindFlushSize" value="0"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType"
value="com.gmail.patil.j.harshal.model.EntityPlayableKey"/>
<property name="valueType"
value="com.gmail.patil.j.harshal.model.EntityPlayable"/>
<property name="tableName" value="entity_playable"/>
<property name="keyFields">
<list>
<value>gameId</value>
<value>playableId</value>
<value>companyId</value>
<value>version</value>
</list>
</property>
<property name="fields">
<map>
<entry key="gameId" value="java.lang.Long"/>
<entry key="playableId" value="java.lang.Long"/>
<entry key="companyId" value="java.lang.Long"/>
<entry key="version" value="java.lang.Integer"/>
<entry key="eventTimestamp"
value="java.sql.Timestamp"/>
<entry key="eventTimestampSys"
value="java.lang.Long"/>
<entry key="companyIdPartition"
value="java.lang.Long"/>
<entry key="partitionkey" value="java.lang.Long"/>
</map>
</property>
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name"
value="company_id_partition_hash_entity_playable_hash"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="companyIdPartition"
value="false"/>
</map>
</property>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name"
value="companyId_entity_playable_hash"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="companyId" value="false"/>
</map>
</property>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name"
value="gameId_entity_playable_hash"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="gameId" value="false"/>
</map>
</property>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name"
value="company_id_partition_entity_playable_normal"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="companyIdPartition"
value="false"/>
</map>
</property>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name"
value="companyId_entity_playable_normal"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="companyId" value="false"/>
</map>
</property>
</bean>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name"
value="gameId_entity_playable_normal"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="gameId" value="false"/>
</map>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
and i have xml file , can you suggest here , is the issue because I am
using xml file while writingdataframe but i have loaded cache from
IgniteContext in which I have injected cacheConfiguration through function
call .
On Tue, Mar 26, 2019 at 4:05 PM Nikolay Izhikov <ni...@apache.org> wrote:
> Hello, Harshal
>
> Can you, please, share your Ignite config?
> Especially, "*ENTITY_PLAYABLE*" cache definition
>
> вт, 26 мар. 2019 г. в 05:35, Denis Magda <dm...@apache.org>:
>
>> Hi, as far as I can guess from the shared details, you should pass the
>> IgniteCache name as a SQL schema if SQL metadata was configured via XML or
>> annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE".
>>
>> -
>> Denis
>>
>>
>> On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil <
>> harshal.patil@mindtickle.com> wrote:
>>
>>> Hi ,
>>> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres
>>> as cachePersistance store . After loading of cache , i can read and convert
>>> data from ignite cache to Spark Dataframe . But while writing back to
>>> ignite , I get below error
>>>
>>> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
>>> "ENTITY_PLAYABLE" not found*; SQL statement:
>>>
>>> INSERT INTO
>>> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
>>> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>>>
>>> at
>>> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
>>> (IgniteH2Indexing.java:1302)
>>>
>>> at
>>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>>>
>>> at
>>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>>>
>>> at
>>> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>>>
>>>
>>>
>>> *Read from Ignite* :
>>>
>>>
>>> loading cache
>>>
>>>
>>> val conf = new SparkConf()
>>> conf.setMaster("spark://harshal-patil.local:7077")
>>> // conf.setMaster("local[*]")
>>> conf.setAppName("IGniteTest")
>>> conf.set("spark.executor.heartbeatInterval", "900s")
>>> conf.set("spark.network.timeout", "950s")
>>> conf.set("spark.default.parallelism", "4")
>>> conf.set("spark.cores.max", "4")
>>> conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>>>
>>> val cfg = () => ServerConfigurationFactory.createConfiguration()
>>>
>>> Ignition.start(ServerConfigurationFactory.createConfiguration())
>>>
>>> val ic : IgniteContext = new IgniteContext(sc, cfg)
>>>
>>> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>>>
>>>
>>>
>>>
>>> *spark.read*
>>>
>>> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>>
>>> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>>
>>> .option(IgniteDataFrameSettings.*OPTION_TABLE*,
>>> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
>>> *count*("gameId").as("total")).collect()(0)
>>>
>>>
>>> *Write To Ignite* :
>>>
>>>
>>> *df.write*
>>>
>>> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>>
>>> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>>
>>>
>>> .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>>>
>>> .option(IgniteDataFrameSettings.
>>> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
>>> "gameId,playableId,companyId,version")
>>>
>>> .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
>>> "true")
>>>
>>> .mode(SaveMode.*Append*)
>>>
>>> .save()
>>>
>>>
>>>
>>> I think the problem is with *Spring bean Injection on executer node* ,
>>> please help , what i am doing wrong .
>>>
>>>
>>>
>>>
Re: Spark dataframe to Ignite write issue .
Posted by Nikolay Izhikov <ni...@apache.org>.
Hello, Harshal
Can you, please, share your Ignite config?
Especially, "*ENTITY_PLAYABLE*" cache definition
вт, 26 мар. 2019 г. в 05:35, Denis Magda <dm...@apache.org>:
> Hi, as far as I can guess from the shared details, you should pass the
> IgniteCache name as a SQL schema if SQL metadata was configured via XML or
> annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE".
>
> -
> Denis
>
>
> On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil <
> harshal.patil@mindtickle.com> wrote:
>
>> Hi ,
>> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres
>> as cachePersistance store . After loading of cache , i can read and convert
>> data from ignite cache to Spark Dataframe . But while writing back to
>> ignite , I get below error
>>
>> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
>> "ENTITY_PLAYABLE" not found*; SQL statement:
>>
>> INSERT INTO
>> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
>> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>>
>> at
>> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
>> (IgniteH2Indexing.java:1302)
>>
>> at
>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>>
>> at
>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>>
>> at
>> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>>
>>
>>
>> *Read from Ignite* :
>>
>>
>> loading cache
>>
>>
>> val conf = new SparkConf()
>> conf.setMaster("spark://harshal-patil.local:7077")
>> // conf.setMaster("local[*]")
>> conf.setAppName("IGniteTest")
>> conf.set("spark.executor.heartbeatInterval", "900s")
>> conf.set("spark.network.timeout", "950s")
>> conf.set("spark.default.parallelism", "4")
>> conf.set("spark.cores.max", "4")
>> conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>>
>> val cfg = () => ServerConfigurationFactory.createConfiguration()
>>
>> Ignition.start(ServerConfigurationFactory.createConfiguration())
>>
>> val ic : IgniteContext = new IgniteContext(sc, cfg)
>>
>> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>>
>>
>>
>>
>> *spark.read*
>>
>> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>
>> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>
>> .option(IgniteDataFrameSettings.*OPTION_TABLE*,
>> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
>> *count*("gameId").as("total")).collect()(0)
>>
>>
>> *Write To Ignite* :
>>
>>
>> *df.write*
>>
>> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>>
>> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>>
>>
>> .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>>
>> .option(IgniteDataFrameSettings.
>> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
>> "gameId,playableId,companyId,version")
>>
>> .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
>> "true")
>>
>> .mode(SaveMode.*Append*)
>>
>> .save()
>>
>>
>>
>> I think the problem is with *Spring bean Injection on executer node* ,
>> please help , what i am doing wrong .
>>
>>
>>
>>
Re: Spark dataframe to Ignite write issue .
Posted by Denis Magda <dm...@apache.org>.
Hi, as far as I can guess from the shared details, you should pass the
IgniteCache name as a SQL schema if SQL metadata was configured via XML or
annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE".
-
Denis
On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil <ha...@mindtickle.com>
wrote:
> Hi ,
> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
> cachePersistance store . After loading of cache , i can read and convert
> data from ignite cache to Spark Dataframe . But while writing back to
> ignite , I get below error
>
> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
> "ENTITY_PLAYABLE" not found*; SQL statement:
>
> INSERT INTO
> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>
> at
> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
> (IgniteH2Indexing.java:1302)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>
> at
> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>
>
>
> *Read from Ignite* :
>
>
> loading cache
>
>
> val conf = new SparkConf()
> conf.setMaster("spark://harshal-patil.local:7077")
> // conf.setMaster("local[*]")
> conf.setAppName("IGniteTest")
> conf.set("spark.executor.heartbeatInterval", "900s")
> conf.set("spark.network.timeout", "950s")
> conf.set("spark.default.parallelism", "4")
> conf.set("spark.cores.max", "4")
> conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>
> val cfg = () => ServerConfigurationFactory.createConfiguration()
>
> Ignition.start(ServerConfigurationFactory.createConfiguration())
>
> val ic : IgniteContext = new IgniteContext(sc, cfg)
>
> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, _]])
>
>
>
>
> *spark.read*
>
> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
> .option(IgniteDataFrameSettings.*OPTION_TABLE*,
> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
> *count*("gameId").as("total")).collect()(0)
>
>
> *Write To Ignite* :
>
>
> *df.write*
>
> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>
> .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>
> .option(IgniteDataFrameSettings.
> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
> "gameId,playableId,companyId,version")
>
> .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
> "true")
>
> .mode(SaveMode.*Append*)
>
> .save()
>
>
>
> I think the problem is with *Spring bean Injection on executer node* ,
> please help , what i am doing wrong .
>
>
>
>