You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Ray <ra...@cisco.com> on 2018/09/18 09:47:24 UTC

Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Currently, OPTION_DISABLE_SPARK_SQL_OPTIMIZATION option can only be set on
spark session level.
It means I can only have Ignite optimization or Spark optimization for one
Spark job.

Let's say I want to load data into spark memory with pushdown filters using
Ignite optimization.
For example, I want to load one day's data using this sql "select * from
tableA where date = '2018-09-01'".
With Ignite optimization, this sql is executed on Ignite and the where
clause filter is applied on Ignite.
But with Spark optimization, all the data in this table will be loaded into
Spark memory and do filter later.

Then I want to join filtered tableA with filtered tableB which is also
loaded from Ignite.
But I want use Spark's join feature to do the join because both filtered
tableA with filtered tableB contains millions or rows and Ignite is not
optimized for join.
How can I do that?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

I suggest that you check those possibilities out:
Does performance increase dramatically if you need it on 10% of data, i.e.,
~1 million records?
Does something change when you have only one client connected?

Note that I was running this example on a single node so it should not be
hard to create environment to check that out.

Note that it is not recommended to have (thick) clients or especially
servers on slow network connectivity.

Regards,
-- 
Ilya Kasnacheev


пт, 28 сент. 2018 г. в 17:24, Ray <ra...@cisco.com>:

> Actually there's only one row in b.
>
> SELECT COUNT(*) FROM b where x = '1';
> COUNT(*)  1
>
> 1 row selected (0.003 seconds)
>
> Maybe because the join performance drops dramatically when the data size is
> more than 10 million or cluster has a lot of clients connected?
> My 6 node cluster has 10 clients connected to it and some of them has slow
> network connectivity.
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by Ray <ra...@cisco.com>.
Actually there's only one row in b.

SELECT COUNT(*) FROM b where x = '1';
COUNT(*)  1

1 row selected (0.003 seconds)

Maybe because the join performance drops dramatically when the data size is
more than 10 million or cluster has a lot of clients connected?
My 6 node cluster has 10 clients connected to it and some of them has slow
network connectivity.





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by "ilya.kasnacheev" <il...@gmail.com>.
Hello!

I have indeed try a use case like yours:

0: jdbc:ignite:thin://127.0.0.1/> create index on b(x,y); 
No rows affected (9,729 seconds)
0: jdbc:ignite:thin://127.0.0.1/> select count(*) from a;
COUNT(*)  1

1 row selected (0,017 seconds)
0: jdbc:ignite:thin://127.0.0.1/> select count(*) from b;
COUNT(*)  4194304

1 row selected (0,024 seconds)
0: jdbc:ignite:thin://127.0.0.1/> select a.x,a.y from a join b where a.y =
b.y and a.x = b.x; 
X  1
Y  1

1 row selected (0,005 seconds)
0: jdbc:ignite:thin://127.0.0.1/> explain select a.x,a.y from a join b where
a.y = b.y and a.x = b.x;
PLAN  SELECT
    __Z0.X AS __C0_0,
    __Z0.Y AS __C0_1
FROM PUBLIC.A __Z0
    /* PUBLIC.A.__SCAN_ */
INNER JOIN PUBLIC.B __Z1
    /* PUBLIC."b_x_asc_y_asc_idx": Y = __Z0.Y
        AND X = __Z0.X
     */
    ON 1=1
WHERE (__Z0.Y = __Z1.Y)
    AND (__Z0.X = __Z1.X)

PLAN  SELECT
    __C0_0 AS X,
    __C0_1 AS Y
FROM PUBLIC.__T0
    /* PUBLIC."merge_scan" */

2 rows selected (0,007 seconds)
^ very fast, compared to 1,598 seconds before index was created

My standing idea is that you have very low selectivity on b.x. I.e. if 10
million out of 14 million b rows will have x = 1, then index will not be
able to help and will only hurt. Can you execute SELECT COUNT(*) FROM b
WHERE x = 1; on your dataset?

Regards,



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by Ray <ra...@cisco.com>.
Here's the detailed information for my join test.

0: jdbc:ignite:thin://sap-datanode6/> select * from a;
x  1
y  1
A       bearbrick

1 row selected (0.002 seconds)
0: jdbc:ignite:thin://sap-datanode6/> select count(*) from b;
COUNT(*)  14337959

1 row selected (0.299 seconds)
0: jdbc:ignite:thin://sap-datanode6/> select x,y from b where _key = '1';
x  1
y  1

1 row selected (0.002 seconds)


select a.x,a.y from a join b where a.x = b.x and a.y = b.y;
x  1
y  1

1 row selected (6.036 seconds)  -- Takes 6 seconds to join a table with one
row to 14 million row table using affinity key x

explain select a.x,a.y from a join b where a.x = b.x and a.y = b.y;

PLAN  SELECT
    A__Z0.x AS __C0_0,
    A__Z0.y AS __C0_1
FROM PUBLIC.B__Z1
    /* PUBLIC.B.__SCAN_ */
INNER JOIN PUBLIC.T A__Z0
    /* PUBLIC.AFFINITY_KEY: x = B__Z1.x */
    ON 1=1
WHERE (A__Z0.y = B__Z1.y)
    AND (A__Z0.x = B__Z1.x)

PLAN  SELECT
    __C0_0 AS x,
    __C0_1 AS y
FROM PUBLIC.__T0
    /* PUBLIC."merge_scan" */

If I create a index on table b on field x and y, it takes 6.8 seconds to
finish join.

create index on b(x,y);
No rows affected (31.316 seconds)

0: jdbc:ignite:thin://sap-datanode6/> select a.x,a.y from a join b where a.y
= b.y and a.x = b.x;
x  1
y  1

1 row selected (6.865 seconds)

0: jdbc:ignite:thin://sap-datanode6/> explain select a.x,a.y from a join b
where a.y = b.y and a.x = b.x;
PLAN  SELECT
    A__Z0.x AS __C0_0,
    A__Z0.y AS __C0_1
FROM PUBLIC.T A__Z0
    /* PUBLIC.T.__SCAN_ */
INNER JOIN PUBLIC.B__Z1
    /* PUBLIC."b_x_asc_y_asc_idx": y = A__Z0.y
        AND x = A__Z0.x
     */
    ON 1=1
WHERE (A__Z0.y = B__Z1.y)
    AND (A__Z0.x = B__Z1.x)

PLAN  SELECT
    __C0_0 AS x,
    __C0_1 AS y
FROM PUBLIC.__T0
    /* PUBLIC."merge_scan" */

2 rows selected (0.003 seconds)

Here's my configuration

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="segmentationPolicy" value="RESTART_JVM"/>
        <property name="peerClassLoadingEnabled" value="true"/>
        <property name="failureDetectionTimeout" value="60000"/>
        <property name="dataStorageConfiguration">
            <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
            <property name="defaultDataRegionConfiguration">
                <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
                    <property name="name" value="default_Region"/>
                    <property name="initialSize" value="#{100L * 1024 * 1024
* 1024}"/>
                    <property name="maxSize" value="#{460L * 1024 * 1024 *
1024}"/>
                    <property name="persistenceEnabled" value="true"/>
                    <property name="checkpointPageBufferSize" value="#{8L *
1024 * 1024 * 1024}"/>
                </bean>
            </property>
            <property name="walMode" value="BACKGROUND"/>
            <property name="walFlushFrequency" value="5000"/>
            <property name="checkpointFrequency" value="600000"/>
            </bean>
        </property>
        <property name="discoverySpi">
                <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    <property name="localPort" value="49500"/>
                    <property name="ipFinder">
                        <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                            <property name="addresses">
                                <list>
                                <value>node1:49500</value>
                                <value>node2:49500</value>
                                <value>node3:49500</value>
                                <value>node4:49500</value>
                                <value>node5:49500</value>
                                <value>node6:49500</value>
                                </list>
                            </property>
                        </bean>
                    </property>
                </bean>
            </property>
            <property name="gridLogger">
            <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
                <constructor-arg type="java.lang.String"
value="config/ignite-log4j2.xml"/>
            </bean>
        </property>
    </bean>
</beans>



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by vkulichenko <va...@gmail.com>.
Ray,

This sounds suspicious. Please show your configuration and the execution
plan for the query.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

Can you show the index that you are creating here?

Regards,
-- 
Ilya Kasnacheev


вт, 25 сент. 2018 г. в 8:23, Ray <ra...@cisco.com>:

> Let's say I have two tables I want to join together.
> Table a has around 10 millions of rows and it's primary key is x and y.
> I have created index on field x and y for table a.
>
> Table b has one row and it's primary key is x and y.
> The primary key for that row in table b has a correspondent row in table a
> which has the same primary key.
>
> When I try to execute this query to join "select a.*,b.* from a inner join
> b
> where (a.x=b.x) and (a.y = b.y);", ti takes more than 4 seconds to show
> only
> one record.
> I also examined the plan for that sql and confirmed the index I created is
> used for this sql.
>
> Ideally, if we use hash join it should take less than half a second.
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by Ray <ra...@cisco.com>.
Let's say I have two tables I want to join together.
Table a has around 10 millions of rows and it's primary key is x and y. 
I have created index on field x and y for table a.

Table b has one row and it's primary key is x and y.
The primary key for that row in table b has a correspondent row in table a
which has the same primary key.

When I try to execute this query to join "select a.*,b.* from a inner join b
where (a.x=b.x) and (a.y = b.y);", ti takes more than 4 seconds to show only
one record.
I also examined the plan for that sql and confirmed the index I created is
used for this sql.

Ideally, if we use hash join it should take less than half a second.




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by vkulichenko <va...@gmail.com>.
If join is indexed and collocated, it still can be pretty fast. Do you have a
particular query that is slower with optimization than without?

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by Ray <ra...@cisco.com>.
Hi Val, thanks for the reply.

I'll try again and let you know if I missed something.

By "Ignite is not optimized for join", I mean currently Ignite only supports
nest loop join which is very inefficient when joining two large table.
Please refer to these two tickets for details.
https://issues.apache.org/jira/browse/IGNITE-6201
https://issues.apache.org/jira/browse/IGNITE-6202



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by vkulichenko <va...@gmail.com>.
Ray,

Per my understanding, pushdown filters are propagated to Ignite either way,
it's not related to the "optimization". Optimization affects joins,
gropings, aggregations, etc. So, unless I'm missing something, the behavior
you're looking for is achieved by setting
OPTION_DISABLE_SPARK_SQL_OPTIMIZATION to true.

However, can you please clarify what you mean "Ignite is not optimized for
join"? 

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

***UNCHECKED*** Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?

Posted by aealexsandrov <ae...@gmail.com>.
Hi,

I am not sure that it will work but you can try next:

        SparkSession spark = SparkSession
            .builder()
            .appName("SomeAppName")
            .master("spark://10.0.75.1:7077")
            .config(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false") //or
true
            .getOrCreate();

        JavaSparkContext sparkContext = new
JavaSparkContext(spark.sparkContext());

        JavaIgniteRDD<K, T> igniteRdd1 = igniteContext.<K,
T>fromCache("CACHE1");

        //here Ignite sql processor will be used because inside
SqlFieldsQuery
        Dataset<Row> ds1 = igniteRdd1.sql("select * from CACHE1");
        Dataset<Row> ds2 = igniteRdd1.sql("select * from CACHE2");

        //here spark sql processor will be used
        ds1.join(ds2).where(<SOME_COND>);

BR,
Andrei



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/