You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2018/04/20 08:59:32 UTC

[1/3] ignite git commit: IGNITE-7077: Implementation of Spark query optimization. - Fixes #3397.

Repository: ignite
Updated Branches:
  refs/heads/master 80086011e -> 10a4c48bf


http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala
new file mode 100644
index 0000000..02793c9
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Double ⇒ JDouble, Long ⇒ JLong}
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationMathFuncSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Supported optimized string functions") {
+        it("ABS") {
+            val df = igniteSession.sql("SELECT ABS(val) FROM numbers WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT ABS(val) FROM numbers WHERE id is not null AND id = 6")
+
+            val data = Tuple1(.5)
+
+            checkQueryData(df, data)
+        }
+
+        it("ACOS") {
+            val df = igniteSession.sql("SELECT ACOS(val) FROM numbers WHERE id = 7")
+
+            checkOptimizationResult(df, "SELECT ACOS(val) FROM numbers WHERE id is not null AND id = 7")
+
+            val data = Tuple1(Math.PI)
+
+            checkQueryData(df, data)
+        }
+
+        it("ASIN") {
+            val df = igniteSession.sql("SELECT ASIN(val) FROM numbers WHERE id = 7")
+
+            checkOptimizationResult(df, "SELECT ASIN(val) FROM numbers WHERE id is not null AND id = 7")
+
+            val data = Tuple1(-Math.PI/2)
+
+            checkQueryData(df, data)
+        }
+
+        it("ATAN") {
+            val df = igniteSession.sql("SELECT ATAN(val) FROM numbers WHERE id = 7")
+
+            checkOptimizationResult(df, "SELECT ATAN(val) FROM numbers WHERE id is not null AND id = 7")
+
+            val data = Tuple1(-Math.PI/4)
+
+            checkQueryData(df, data)
+        }
+
+        it("COS") {
+            val df = igniteSession.sql("SELECT COS(val) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT COS(val) FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(1.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("SIN") {
+            val df = igniteSession.sql("SELECT SIN(val) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT SIN(val) FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("TAN") {
+            val df = igniteSession.sql("SELECT TAN(val) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT TAN(val) FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("COSH") {
+            val df = igniteSession.sql("SELECT COSH(val) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT COSH(val) FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(1.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("SINH") {
+            val df = igniteSession.sql("SELECT SINH(val) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT SINH(val) FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("TANH") {
+            val df = igniteSession.sql("SELECT TANH(val) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT TANH(val) FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("ATAN2") {
+            val df = igniteSession.sql("SELECT ATAN2(val, 0.0) FROM numbers WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT ATAN2(val, 0.0) AS \"ATAN2(val, CAST(0.0 AS DOUBLE))\" " +
+                "FROM numbers WHERE id is not null AND id = 1")
+
+            val data = Tuple1(.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("MOD") {
+            val df = igniteSession.sql("SELECT val % 9 FROM numbers WHERE id = 8")
+
+            checkOptimizationResult(df, "SELECT val % 9.0 as \"(val % CAST(9 AS DOUBLE))\" " +
+                "FROM numbers WHERE id is not null AND id = 8")
+
+            val data = Tuple1(6.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("CEIL") {
+            val df = igniteSession.sql("SELECT CEIL(val) FROM numbers WHERE id = 2")
+
+            checkOptimizationResult(df, "SELECT CAST(CEIL(val) AS LONG) as \"CEIL(val)\" " +
+                "FROM numbers WHERE id is not null AND id = 2")
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it("ROUND") {
+        val df = igniteSession.sql("SELECT id, ROUND(val) FROM numbers WHERE  id IN (2, 9, 10)")
+
+            checkOptimizationResult(df, "SELECT id, ROUND(val, 0) FROM numbers WHERE id IN (2, 9, 10)")
+
+            val data = (
+                (2, 1.0),
+                (9, 1.0),
+                (10, 0.0))
+
+            checkQueryData(df, data)
+        }
+
+        it("FLOOR") {
+            val df = igniteSession.sql("SELECT FLOOR(val) FROM numbers WHERE id = 2")
+
+            checkOptimizationResult(df, "SELECT CAST(FLOOR(val) AS LONG) as \"FLOOR(val)\" FROM numbers " +
+                "WHERE id is not null AND id = 2")
+
+            val data = Tuple1(0)
+
+            checkQueryData(df, data)
+        }
+
+        it("POWER") {
+            val df = igniteSession.sql("SELECT POWER(val, 3) FROM numbers WHERE id = 4")
+
+            checkOptimizationResult(df, "SELECT POWER(val, 3.0) as \"POWER(val, CAST(3 AS DOUBLE))\" FROM numbers " +
+                "WHERE id is not null AND id = 4")
+
+            val data = Tuple1(8.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("EXP") {
+            val df = igniteSession.sql("SELECT id, EXP(val) FROM numbers WHERE id IN (1, 3)")
+
+            checkOptimizationResult(df, "SELECT id, EXP(val) FROM numbers WHERE id IN (1, 3)")
+
+            val data = (
+                (1, 1),
+                (3, Math.E))
+
+            checkQueryData(df, data)
+        }
+
+        it("LOG") {
+            val df = igniteSession.sql("SELECT LOG(val) FROM numbers WHERE id = 12")
+
+            checkOptimizationResult(df, "SELECT LOG(val) as \"LOG(E(), val)\" FROM numbers " +
+                "WHERE id IS NOT NULL AND id = 12")
+
+            val data = Tuple1(2.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("LOG10") {
+            val df = igniteSession.sql("SELECT LOG10(val) FROM numbers WHERE id = 11")
+
+            checkOptimizationResult(df, "SELECT LOG10(val) FROM numbers WHERE id IS NOT NULL AND id = 11")
+
+            val data = Tuple1(2.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("DEGREES") {
+            val df = igniteSession.sql("SELECT DEGREES(val) FROM numbers WHERE id = 13")
+
+            checkOptimizationResult(df, "SELECT DEGREES(val) FROM numbers WHERE id IS NOT NULL AND id = 13")
+
+            val data = Tuple1(180.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("RADIANS") {
+            val df = igniteSession.sql("SELECT RADIANS(val) FROM numbers WHERE id = 14")
+
+            checkOptimizationResult(df, "SELECT RADIANS(val) FROM numbers WHERE id IS NOT NULL AND id = 14")
+
+            val data = Tuple1(Math.PI)
+
+            checkQueryData(df, data)
+        }
+
+        it("BITAND") {
+            val df = igniteSession.sql("SELECT int_val&1 FROM numbers WHERE id = 15")
+
+            checkOptimizationResult(df, "SELECT BITAND(int_val, 1) as \"(int_val & CAST(1 AS BIGINT))\" FROM numbers " +
+                "WHERE id IS NOT NULL AND id = 15")
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it("BITOR") {
+            val df = igniteSession.sql("SELECT int_val|1 FROM numbers WHERE id = 16")
+
+            checkOptimizationResult(df, "SELECT BITOR(int_val, 1) as \"(int_val | CAST(1 AS BIGINt))\" FROM numbers " +
+                "WHERE id IS NOT NULL AND id = 16")
+
+            val data = Tuple1(3)
+
+            checkQueryData(df, data)
+        }
+
+        it("BITXOR") {
+            val df = igniteSession.sql("SELECT int_val^1 FROM numbers WHERE id = 17")
+
+            checkOptimizationResult(df, "SELECT BITXOR(int_val, 1) AS \"(int_val ^ CAST(1 AS BIGINT))\" FROM numbers " +
+                "WHERE id IS NOT NULL AND id = 17")
+
+            val data = Tuple1(2)
+
+            checkQueryData(df, data)
+        }
+
+        it("RAND") {
+            val df = igniteSession.sql("SELECT id, RAND(1) FROM numbers WHERE id = 17")
+
+            checkOptimizationResult(df, "SELECT id, RAND(1) FROM numbers WHERE id IS NOT NULL AND id = 17")
+
+            val data = df.rdd.collect
+
+            assert(data(0).getAs[JLong]("id") == 17L)
+            assert(data(0).getAs[JDouble]("rand(1)") != null)
+        }
+    }
+
+    def createNumberTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE numbers (
+              |    id LONG,
+              |    val DOUBLE,
+              |    int_val LONG,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        var qry = new SqlFieldsQuery("INSERT INTO numbers (id, val) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], .0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], .5.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], 1.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], 2.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(5L.asInstanceOf[JLong], 4.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(6L.asInstanceOf[JLong], -0.5.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(7L.asInstanceOf[JLong], -1.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(8L.asInstanceOf[JLong], 42.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(9L.asInstanceOf[JLong], .51.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(10L.asInstanceOf[JLong], .49.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(11L.asInstanceOf[JLong], 100.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(12L.asInstanceOf[JLong], (Math.E*Math.E).asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(13L.asInstanceOf[JLong], Math.PI.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(14L.asInstanceOf[JLong], 180.0.asInstanceOf[JDouble])).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val) values (?, ?)")
+
+        cache.query(qry.setArgs(15L.asInstanceOf[JLong], 1L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(16L.asInstanceOf[JLong], 2L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(17L.asInstanceOf[JLong], 3L.asInstanceOf[JLong])).getAll
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createNumberTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
new file mode 100644
index 0000000..00075f4
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expdfs or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Optimized queries") {
+        it("SELECT name as city_name FROM city") {
+            val df = igniteSession.sql("SELECT name as city_name FROM city")
+
+            checkOptimizationResult(df, "SELECT name as city_name FROM city")
+        }
+
+        it("SELECT count(*) as city_count FROM city") {
+            val df = igniteSession.sql("SELECT count(1) as city_count FROM city")
+
+            checkOptimizationResult(df, "SELECT count(1) as city_count FROM city")
+        }
+
+        it("SELECT count(*), city_id FROM person p GROUP BY city_id") {
+            val df = igniteSession.sql("SELECT city_id, count(*) FROM person GROUP BY city_id")
+
+            checkOptimizationResult(df, "SELECT city_id, count(1) FROM person GROUP BY city_id")
+
+            val data = (
+                (1, 1),
+                (2, 3),
+                (3, 1)
+            )
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id, name FROM person WHERE id > 3 ORDER BY id") {
+            val df = igniteSession.sql("SELECT id, name FROM person WHERE id > 3 ORDER BY id")
+
+            checkOptimizationResult(df, "SELECT id, name FROM person WHERE id IS NOT NULL AND id > 3 ORDER BY id")
+
+            val data = (
+                (4, "Richard Miles"),
+                (5, null))
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id, name FROM person WHERE id > 3 ORDER BY id DESC") {
+            val df = igniteSession.sql("SELECT id, name FROM person WHERE id > 3 ORDER BY id DESC")
+
+            checkOptimizationResult(df, "SELECT id, name FROM person WHERE id IS NOT NULL AND id > 3 ORDER BY id DESC")
+
+            val data = (
+                (5, null),
+                (4, "Richard Miles"))
+
+            checkQueryData(df, data, -_.getAs[Long]("id"))
+        }
+
+        it("SELECT id, test_reverse(name) FROM city ORDER BY id") {
+            igniteSession.udf.register("test_reverse", (str: String) ⇒ str.reverse)
+
+            val df = igniteSession.sql("SELECT id, test_reverse(name) FROM city ORDER BY id")
+
+            checkOptimizationResult(df, "SELECT name, id FROM city")
+
+            val data = (
+                (1, "Forest Hill".reverse),
+                (2, "Denver".reverse),
+                (3, "St. Petersburg".reverse),
+                (4, "St. Petersburg".reverse))
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT count(*), city_id FROM person p GROUP BY city_id HAVING count(*) > 1") {
+            val df = igniteSession.sql("SELECT city_id, count(*) FROM person p GROUP BY city_id HAVING count(*) > 1")
+
+            checkOptimizationResult(df, "SELECT city_id, count(1) FROM person GROUP BY city_id HAVING count(1) > 1")
+
+            val data = Tuple1(
+                (2, 3))
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id FROM city HAVING id > 1") {
+            val df = igniteSession.sql("SELECT id FROM city HAVING id > 1")
+
+            checkOptimizationResult(df, "SELECT id FROM city WHERE id IS NOT NULL AND id > 1")
+
+            val data = (2, 3, 4)
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT DISTINCT name FROM city ORDER BY name") {
+            val df = igniteSession.sql("SELECT DISTINCT name FROM city ORDER BY name")
+
+            checkOptimizationResult(df, "SELECT name FROM city GROUP BY name ORDER BY name")
+
+            val data = ("Denver", "Forest Hill", "St. Petersburg")
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id, name FROM city ORDER BY id, name") {
+            val df = igniteSession.sql("SELECT id, name FROM city ORDER BY id, name")
+
+            checkOptimizationResult(df, "SELECT id, name FROM city ORDER BY id, name")
+
+            val data = (
+                (1, "Forest Hill"),
+                (2, "Denver"),
+                (3, "St. Petersburg"),
+                (4, "St. Petersburg"))
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id, name FROM city WHERE id > 1 ORDER BY id") {
+            val df = igniteSession.sql("SELECT id, name FROM city WHERE id > 1 ORDER BY id")
+
+            checkOptimizationResult(df, "SELECT id, name FROM city WHERE id IS NOT NULL and id > 1 ORDER BY id")
+
+            val data = (
+                (2, "Denver"),
+                (3, "St. Petersburg"),
+                (4, "St. Petersburg"))
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT count(*) FROM city") {
+            val df = igniteSession.sql("SELECT count(*) FROM city")
+
+            checkOptimizationResult(df, "SELECT count(1) FROM city")
+
+            val data = Tuple1(4)
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT count(DISTINCT name)  FROM city") {
+            val df = igniteSession.sql("SELECT count(DISTINCT name) FROM city")
+
+            checkOptimizationResult(df, "SELECT count(DISTINCT name) FROM city")
+
+            val data = Tuple1(3)
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id FROM city LIMIT 2") {
+            val df = igniteSession.sql("SELECT id FROM city LIMIT 2")
+
+            checkOptimizationResult(df, "SELECT id FROM city LIMIT 2")
+
+            val data = (1, 2)
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT CAST(id AS STRING) FROM city") {
+            val df = igniteSession.sql("SELECT CAST(id AS STRING) FROM city")
+
+            checkOptimizationResult(df, "SELECT CAST(id AS varchar) as id FROM city")
+
+            val data = ("1", "2", "3", "4")
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT SQRT(id) FROM city WHERE id = 4 OR id = 1") {
+            val df = igniteSession.sql("SELECT SQRT(id) FROM city WHERE id = 4 OR id = 1")
+
+            checkOptimizationResult(df,
+                "SELECT SQRT(cast(id as double)) FROM city WHERE id = 4 OR id = 1")
+
+            val data = (1, 2)
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT CONCAT(id, \" - this is ID\") FROM city") {
+            val df = igniteSession.sql("SELECT CONCAT(id, \" - this is ID\") FROM city")
+
+            checkOptimizationResult(df,
+                "SELECT CONCAT(cast(id AS VARCHAR), ' - this is ID') as \"CONCAT(cast(id AS STRING),  - this is ID)\" " +
+                    "FROM city")
+
+            val data = (
+                "1 - this is ID",
+                "2 - this is ID",
+                "3 - this is ID",
+                "4 - this is ID")
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id FROM city WHERE CONCAT(id, \" - this is ID\") = \"1 - this is ID\"") {
+            val df = igniteSession.sql("SELECT id FROM city WHERE CONCAT(id, \" - this is ID\") = \"1 - this is ID\"")
+
+            checkOptimizationResult(df,
+                "SELECT id FROM city WHERE CONCAT(CAST(id AS VARCHAR), ' - this is ID') = '1 - this is ID'")
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+    }
+
+    describe("Not Optimized Queries") {
+        it("SELECT id, name FROM json_cities") {
+            val citiesDataFrame = igniteSession.read.json(
+                resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath)
+
+            citiesDataFrame.createOrReplaceTempView("JSON_CITIES")
+
+            val df = igniteSession.sql("SELECT id, name FROM json_cities")
+
+            val data = (
+                (1, "Forest Hill"),
+                (2, "Denver"),
+                (3, "St. Petersburg"))
+
+            checkQueryData(df, data)
+        }
+
+        it("SELECT id, test_reverse(name) tr FROM city WHERE test_reverse(name) = 'revneD' ORDER BY id") {
+            val df = igniteSession.sql("SELECT id, test_reverse(name) tr " +
+                "FROM city WHERE test_reverse(name) = 'revneD' ORDER BY id")
+
+            checkOptimizationResult(df)
+        }
+
+        it("SELECT id, test_reverse(name) tr FROM city WHERE test_reverse(name) = 'revneD' and id > 0 ORDER BY id") {
+            val df = igniteSession.sql("SELECT id, test_reverse(name) tr " +
+                "FROM city WHERE test_reverse(name) = 'revneD' and id > 0 ORDER BY id")
+
+            checkOptimizationResult(df)
+        }
+
+        it("SELECT id, test_reverse(name) tr FROM city ORDER BY tr") {
+            val df = igniteSession.sql("SELECT id, test_reverse(name) tr FROM city ORDER BY tr")
+
+            checkOptimizationResult(df)
+        }
+
+        it("SELECT count(*), test_reverse(name) tr FROM city GROUP BY test_reverse(name)") {
+            val df = igniteSession.sql("SELECT count(*), test_reverse(name) tr FROM city GROUP BY test_reverse(name)")
+
+            checkOptimizationResult(df)
+        }
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createPersonTable(client, DEFAULT_CACHE)
+
+        createCityTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+
+        igniteSession.udf.register("test_reverse", (str: String) ⇒ str.reverse)
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala
new file mode 100644
index 0000000..db106f2
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Long ⇒ JLong}
+
+/**
+  * === Doesn't supported by Spark ===
+  * CHAR
+  * DIFFERENCE
+  * HEXTORAW
+  * RAWTOHEX
+  * REGEXP_LIKE
+  * SOUNDEX
+  * STRINGDECODE
+  * STRINGENCODE
+  * STRINGTOUTF8
+  * UTF8TOSTRING
+  * XMLATTR
+  * XMLNODE
+  * XMLCOMMENT
+  * XMLCDATA
+  * XMLSTARTDOC
+  * XMLTEXT
+  * TO_CHAR - The function that can format a timestamp, a number, or text.
+  * ====== This functions in spark master but not in release =====
+  * LEFT
+  * RIGHT
+  * INSERT
+  * REPLACE
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationStringFuncSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Supported optimized string functions") {
+        it("LENGTH") {
+            val df = igniteSession.sql("SELECT LENGTH(str) FROM strings WHERE id <= 3")
+
+            checkOptimizationResult(df, "SELECT CAST(LENGTH(str) AS INTEGER) as \"length(str)\" FROM strings " +
+                "WHERE id is not null AND id <= 3")
+
+            val data = (3, 3, 6)
+
+            checkQueryData(df, data)
+        }
+
+        it("RTRIM") {
+            val df = igniteSession.sql("SELECT RTRIM(str) FROM strings WHERE id = 3")
+
+            checkOptimizationResult(df, "SELECT RTRIM(str) FROM strings WHERE id is not null AND id = 3")
+
+            val data = Tuple1("AAA")
+
+            checkQueryData(df, data)
+        }
+
+        it("LTRIM") {
+            val df = igniteSession.sql("SELECT LTRIM(str) FROM strings WHERE id = 4")
+
+            checkOptimizationResult(df, "SELECT LTRIM(str) FROM strings WHERE id is not null AND id = 4")
+
+            val data = Tuple1("AAA")
+
+            checkQueryData(df, data)
+        }
+
+        it("TRIM") {
+            val df = igniteSession.sql("SELECT TRIM(str) FROM strings WHERE id = 5")
+
+            checkOptimizationResult(df, "SELECT TRIM(str) FROM strings WHERE id is not null AND id = 5")
+
+            val data = Tuple1("AAA")
+
+            checkQueryData(df, data)
+        }
+
+        it("LOWER") {
+            val df = igniteSession.sql("SELECT LOWER(str) FROM strings WHERE id = 2")
+
+            checkOptimizationResult(df, "SELECT LOWER(str) FROM strings WHERE id is not null AND id = 2")
+
+            val data = Tuple1("aaa")
+
+            checkQueryData(df, data)
+        }
+
+        it("UPPER") {
+            val df = igniteSession.sql("SELECT UPPER(str) FROM strings WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT UPPER(str) FROM strings WHERE id is not null AND id = 1")
+
+            val data = Tuple1("AAA")
+
+            checkQueryData(df, data)
+        }
+
+        it("LOWER(RTRIM)") {
+            val df = igniteSession.sql("SELECT LOWER(RTRIM(str)) FROM strings WHERE id = 3")
+
+            checkOptimizationResult(df, "SELECT LOWER(RTRIM(str)) FROM strings WHERE id is not null AND id = 3")
+
+            val data = Tuple1("aaa")
+
+            checkQueryData(df, data)
+        }
+
+        it("LOCATE") {
+            val df = igniteSession.sql("SELECT LOCATE('D', str) FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT LOCATE('D', str, 1) FROM strings WHERE id is not null AND id = 6")
+
+            val data = Tuple1(4)
+
+            checkQueryData(df, data)
+        }
+
+        it("LOCATE - 2") {
+            val df = igniteSession.sql("SELECT LOCATE('A', str) FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT LOCATE('A', str, 1) FROM strings WHERE id is not null AND id = 6")
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it("POSITION") {
+            val df = igniteSession.sql("SELECT instr(str, 'BCD') FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT POSITION('BCD', str) as \"instr(str, BCD)\" FROM strings " +
+                "WHERE id is not null AND id = 6")
+
+            val data = Tuple1(2)
+
+            checkQueryData(df, data)
+        }
+
+        it("CONCAT") {
+            val df = igniteSession.sql("SELECT concat(str, 'XXX') FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT concat(str, 'XXX') FROM strings WHERE id is not null AND id = 6")
+
+            val data = Tuple1("ABCDEFXXX")
+
+            checkQueryData(df, data)
+        }
+
+        it("RPAD") {
+            val df = igniteSession.sql("SELECT RPAD(str, 10, 'X') FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT RPAD(str, 10, 'X') FROM strings WHERE id is not null AND id = 6")
+
+            val data = Tuple1("ABCDEFXXXX")
+
+            checkQueryData(df, data)
+        }
+
+        it("LPAD") {
+            val df = igniteSession.sql("SELECT LPAD(str, 10, 'X') FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT LPAD(str, 10, 'X') FROM strings WHERE id is not null AND id = 6")
+
+            val data = Tuple1("XXXXABCDEF")
+
+            checkQueryData(df, data)
+        }
+
+        it("REPEAT") {
+            val df = igniteSession.sql("SELECT REPEAT(str, 2) FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT REPEAT(str, 2) FROM strings WHERE id is not null AND id = 6")
+
+            val data = Tuple1("ABCDEFABCDEF")
+
+            checkQueryData(df, data)
+        }
+
+        it("SUBSTRING") {
+            val df = igniteSession.sql("SELECT SUBSTRING(str, 4, 3) FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT SUBSTR(str, 4, 3) as \"SUBSTRING(str, 4, 3)\" FROM strings " +
+                "WHERE id is not null AND id = 6")
+
+            val data = Tuple1("DEF")
+
+            checkQueryData(df, data)
+        }
+
+        it("SPACE") {
+            val df = igniteSession.sql("SELECT SPACE(LENGTH(str)) FROM strings WHERE id = 1")
+
+            checkOptimizationResult(df, "SELECT SPACE(CAST(LENGTH(str) AS INTEGER)) as \"SPACE(LENGTH(str))\" " +
+                "FROM strings WHERE id is not null AND id = 1")
+
+            val data = Tuple1("   ")
+
+            checkQueryData(df, data)
+        }
+
+        it("ASCII") {
+            val df = igniteSession.sql("SELECT ASCII(str) FROM strings WHERE id = 7")
+
+            checkOptimizationResult(df, "SELECT ASCII(str) FROM strings WHERE id is not null AND id = 7")
+
+            val data = Tuple1(50)
+
+            checkQueryData(df, data)
+        }
+
+        it("REGEXP_REPLACE") {
+            val df = igniteSession.sql("SELECT REGEXP_REPLACE(str, '(\\\\d+)', 'num') FROM strings WHERE id = 7")
+
+            checkOptimizationResult(df, "SELECT REGEXP_REPLACE(str, '(\\d+)', 'num') FROM strings " +
+                "WHERE id is not null AND id = 7")
+
+            val data = Tuple1("num")
+
+            checkQueryData(df, data)
+        }
+
+        it("CONCAT_WS") {
+            val df = igniteSession.sql("SELECT id, CONCAT_WS(', ', str, 'after') FROM strings " +
+                "WHERE id >= 7 AND id <= 8")
+
+            checkOptimizationResult(df, "SELECT id, CONCAT_WS(', ', str, 'after') FROM strings " +
+                "WHERE id is not null AND id >= 7 AND id <= 8")
+
+            val data = (
+                (7, "222, after"),
+                (8, "after"))
+
+            checkQueryData(df, data)
+        }
+
+        it("TRANSLATE") {
+            val df = igniteSession.sql("SELECT id, TRANSLATE(str, 'DEF', 'ABC') FROM strings WHERE id = 6")
+
+            checkOptimizationResult(df, "SELECT id, TRANSLATE(str, 'DEF', 'ABC') FROM strings " +
+                "WHERE id is not null AND id = 6")
+
+            val data = Tuple1((6, "ABCABC"))
+
+            checkQueryData(df, data)
+        }
+    }
+
+    def createStringTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE strings (
+              |    id LONG,
+              |    str VARCHAR,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        val qry = new SqlFieldsQuery("INSERT INTO strings (id, str) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "aaa")).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "AAA")).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "AAA   ")).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], "   AAA")).getAll
+        cache.query(qry.setArgs(5L.asInstanceOf[JLong], "   AAA   ")).getAll
+        cache.query(qry.setArgs(6L.asInstanceOf[JLong], "ABCDEF")).getAll
+        cache.query(qry.setArgs(7L.asInstanceOf[JLong], "222")).getAll
+        cache.query(qry.setArgs(8L.asInstanceOf[JLong], null)).getAll
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createStringTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala
new file mode 100644
index 0000000..282a45f
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Double ⇒ JDouble, Long ⇒ JLong}
+
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationSystemFuncSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Supported optimized system functions") {
+        it("COALESCE") {
+            val df = igniteSession.sql("SELECT COALESCE(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)")
+
+            checkOptimizationResult(df, "SELECT COALESCE(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)")
+
+            val data = (1, 2, 3)
+
+            checkQueryData(df, data)
+        }
+
+        it("GREATEST") {
+            val df = igniteSession.sql("SELECT GREATEST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
+
+            checkOptimizationResult(df, "SELECT GREATEST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
+
+            val data = (4, 6)
+
+            checkQueryData(df, data)
+        }
+
+        it("LEAST") {
+            val df = igniteSession.sql("SELECT LEAST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
+
+            checkOptimizationResult(df, "SELECT LEAST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)")
+
+            val data = (3, 5)
+
+            checkQueryData(df, data)
+        }
+
+        it("IFNULL") {
+            val df = igniteSession.sql("SELECT IFNULL(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)")
+
+            checkOptimizationResult(df, "SELECT COALESCE(int_val1, int_val2) as \"ifnull(numbers.`int_val1`, numbers.`int_val2`)\" FROM numbers WHERE id IN (1, 2, 3)")
+
+            val data = (1, 2, 3)
+
+            checkQueryData(df, data)
+        }
+
+        it("NULLIF") {
+            val df = igniteSession.sql("SELECT id, NULLIF(int_val1, int_val2) FROM numbers WHERE id IN (6, 7)")
+
+            checkOptimizationResult(df)
+
+            val data = (
+                (6, null),
+                (7, 8))
+
+            checkQueryData(df, data)
+        }
+
+        it("NVL2") {
+            val df = igniteSession.sql("SELECT id, NVL2(int_val1, 'not null', 'null') FROM numbers WHERE id IN (1, 2, 3)")
+
+            checkOptimizationResult(df)
+
+            val data = (
+                (1, "not null"),
+                (2, "null"),
+                (3, "not null"))
+
+            checkQueryData(df, data)
+        }
+    }
+
+    def createNumberTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE numbers (
+              |    id LONG,
+              |    int_val1 LONG,
+              |    int_val2 LONG,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+
+        val qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val1, int_val2) values (?, ?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], 1L.asInstanceOf[JLong], null)).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], null, 2L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], 3L.asInstanceOf[JLong], null)).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], 3L.asInstanceOf[JLong], 4L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(5L.asInstanceOf[JLong], 6L.asInstanceOf[JLong], 5L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(6L.asInstanceOf[JLong], 7L.asInstanceOf[JLong], 7L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(7L.asInstanceOf[JLong], 8L.asInstanceOf[JLong], 9L.asInstanceOf[JLong])).getAll
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createNumberTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}


[2/3] ignite git commit: IGNITE-7077: Implementation of Spark query optimization. - Fixes #3397.

Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
new file mode 100644
index 0000000..723e17a
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.spark.impl.optimization.{IgniteQueryContext, exprToString, toAttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
+
+/**
+  * Accumulator to store info about UNION query.
+  */
+private[apache] case class UnionSQLAccumulator(
+    igniteQueryContext: IgniteQueryContext,
+    children: Seq[QueryAccumulator],
+    outputExpressions: Seq[NamedExpression],
+    orderBy: Option[Seq[SortOrder]] = None
+) extends QueryAccumulator {
+    /** @inheritdoc */
+    override def compileQuery(prettyPrint: Boolean = false): String = {
+        val delim = if (prettyPrint) "\n" else " "
+        val tab = if (prettyPrint) "  " else ""
+
+        val query = children.map(_.compileQuery(prettyPrint)).mkString(s"${delim}UNION$delim")
+
+        orderBy match {
+            case Some(sortOrders) ⇒
+                query + s"${delim}ORDER BY ${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}"
+
+            case None ⇒ query
+        }
+    }
+
+    /** @inheritdoc */
+    override def simpleString: String =
+        s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_)).mkString(", ")).getOrElse("[]")})"
+
+    /** @inheritdoc */
+    override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator =
+        copy(outputExpressions= outputExpressions)
+
+    /** @inheritdoc */
+    override def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator = copy(orderBy = Some(orderBy))
+
+    /** @inheritdoc */
+    override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty))
+
+    /** @inheritdoc */
+    override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
new file mode 100644
index 0000000..4e168f4
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.optimization.accumulator.{QueryAccumulator, SingleTableSQLAccumulator}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.types._
+
+import scala.annotation.tailrec
+
+/**
+  */
+package object optimization {
+    /**
+      * Constant to store alias in column metadata.
+      */
+    private[optimization] val ALIAS: String = "alias"
+
+    /**
+      * All `SupportedExpression` implementations.
+      */
+    private val SUPPORTED_EXPRESSIONS: List[SupportedExpressions] = List (
+        SimpleExpressions,
+        SystemExpressions,
+        AggregateExpressions,
+        ConditionExpressions,
+        DateExpressions,
+        MathExpressions,
+        StringExpressions
+    )
+
+    /**
+      * @param expr Expression.
+      * @param useQualifier If true outputs attributes of `expr` with qualifier.
+      * @param useAlias If true outputs `expr` with alias.
+      * @return String representation of expression.
+      */
+    def exprToString(expr: Expression, useQualifier: Boolean = false, useAlias: Boolean = true): String = {
+        @tailrec
+        def exprToString0(expr: Expression, supportedExpressions: List[SupportedExpressions]): Option[String] =
+            if (supportedExpressions.nonEmpty) {
+                val exprStr = supportedExpressions.head.toString(
+                    expr,
+                    exprToString(_, useQualifier, useAlias = false),
+                    useQualifier,
+                    useAlias)
+
+                exprStr match {
+                    case res: Some[String] ⇒
+                        res
+                    case None ⇒
+                        exprToString0(expr, supportedExpressions.tail)
+                }
+            }
+            else
+                None
+
+        exprToString0(expr, SUPPORTED_EXPRESSIONS) match {
+            case Some(str) ⇒ str
+
+            case None ⇒
+                throw new IgniteException("Unsupporte expression " + expr)
+        }
+    }
+
+    /**
+      * @param exprs Expressions to check.
+      * @return True if `exprs` contains only allowed(i.e. can be pushed down to Ignite) expressions false otherwise.
+      */
+    def exprsAllowed(exprs: Seq[Expression]): Boolean =
+        exprs.forall(exprsAllowed)
+
+    /**
+      * @param expr Expression to check.
+      * @return True if `expr` allowed(i.e. can be pushed down to Ignite) false otherwise.
+      *
+      */
+    def exprsAllowed(expr: Expression): Boolean =
+        SUPPORTED_EXPRESSIONS.exists(_(expr, exprsAllowed))
+
+    /**
+      * Converts `input` into `AttributeReference`.
+      *
+      * @param input Expression to convert.
+      * @param existingOutput Existing output.
+      * @param exprId Optional expression ID to use.
+      * @param alias Optional alias for a result.
+      * @return Converted expression.
+      */
+    def toAttributeReference(input: Expression, existingOutput: Seq[NamedExpression], exprId: Option[ExprId] = None,
+        alias: Option[String] = None): AttributeReference = {
+
+        input match {
+            case attr: AttributeReference ⇒
+                val toCopy = existingOutput.find(_.exprId == attr.exprId).getOrElse(attr)
+
+                AttributeReference(
+                    name = toCopy.name,
+                    dataType = toCopy.dataType,
+                    metadata = alias
+                        .map(new MetadataBuilder().withMetadata(toCopy.metadata).putString(ALIAS, _).build())
+                        .getOrElse(toCopy.metadata)
+                )(exprId = exprId.getOrElse(toCopy.exprId), qualifier = toCopy.qualifier, isGenerated = toCopy.isGenerated)
+
+            case a: Alias ⇒
+                toAttributeReference(a.child, existingOutput, Some(a.exprId), Some(alias.getOrElse(a.name)))
+
+            case agg: AggregateExpression ⇒
+                agg.aggregateFunction match {
+                    case c: Count ⇒
+                        if (agg.isDistinct)
+                            AttributeReference(
+                                name = s"COUNT(DISTINCT ${c.children.map(exprToString(_)).mkString(" ")})",
+                                dataType = LongType,
+                                metadata = alias
+                                    .map(new MetadataBuilder().putString(ALIAS, _).build())
+                                    .getOrElse(Metadata.empty)
+                            )(exprId = exprId.getOrElse(agg.resultId))
+                        else
+                            AttributeReference(
+                                name = s"COUNT(${c.children.map(exprToString(_)).mkString(" ")})",
+                                dataType = LongType,
+                                metadata = alias
+                                    .map(new MetadataBuilder().putString(ALIAS, _).build())
+                                    .getOrElse(Metadata.empty)
+                            )(exprId = exprId.getOrElse(agg.resultId))
+
+                    case _ ⇒
+                        toAttributeReference(agg.aggregateFunction, existingOutput, Some(exprId.getOrElse(agg.resultId)), alias)
+                }
+
+            case ne: NamedExpression ⇒
+                AttributeReference(
+                    name = exprToString(input),
+                    dataType = input.dataType,
+                    metadata = alias
+                        .map(new MetadataBuilder().withMetadata(ne.metadata).putString(ALIAS, _).build())
+                        .getOrElse(Metadata.empty)
+                )(exprId = exprId.getOrElse(ne.exprId))
+
+            case _ if exprsAllowed(input) ⇒
+                AttributeReference(
+                    name = exprToString(input),
+                    dataType = input.dataType,
+                    metadata = alias
+                        .map(new MetadataBuilder().putString(ALIAS, _).build())
+                        .getOrElse(Metadata.empty)
+                )(exprId = exprId.getOrElse(NamedExpression.newExprId))
+
+            case _  ⇒
+                throw new IgniteException(s"Unsupported column expression $input")
+        }
+    }
+
+    /**
+      * @param dataType Spark data type.
+      * @return SQL data type.
+      */
+    def toSqlType(dataType: DataType): String = dataType match {
+        case BooleanType ⇒ "BOOLEAN"
+        case IntegerType ⇒ "INT"
+        case ByteType ⇒ "TINYINT"
+        case ShortType ⇒ "SMALLINT"
+        case LongType ⇒ "BIGINT"
+        case DecimalType() ⇒ "DECIMAL"
+        case DoubleType ⇒ "DOUBLE"
+        case FloatType ⇒ "REAL"
+        case DateType ⇒ "DATE"
+        case TimestampType ⇒ "TIMESTAMP"
+        case StringType ⇒ "VARCHAR"
+        case BinaryType ⇒ "BINARY"
+        case ArrayType(_, _) ⇒ "ARRAY"
+        case _ ⇒
+            throw new IgniteException(s"$dataType not supported!")
+    }
+
+    /**
+      * @param expr Expression
+      * @return True if expression or some of it children is AggregateExpression, false otherwise.
+      */
+    def hasAggregateInside(expr: Expression): Boolean = {
+        def hasAggregateInside0(expr: Expression): Boolean = expr match {
+            case AggregateExpression(_, _, _, _) ⇒
+                true
+
+            case e: Expression ⇒
+                e.children.exists(hasAggregateInside0)
+        }
+
+        hasAggregateInside0(expr)
+    }
+
+    /**
+      * Check if `acc` representing simple query.
+      * Simple is `SELECT ... FROM table WHERE ... ` like query.
+      * Without aggregation, limits, order, embedded select expressions.
+      *
+      * @param acc Accumulator to check.
+      * @return True if accumulator stores simple query info, false otherwise.
+      */
+    def isSimpleTableAcc(acc: QueryAccumulator): Boolean = acc match {
+        case acc: SingleTableSQLAccumulator if acc.table.isDefined ⇒
+            acc.groupBy.isEmpty &&
+                acc.localLimit.isEmpty &&
+                acc.orderBy.isEmpty &&
+                !acc.distinct &&
+                !acc.outputExpressions.exists(hasAggregateInside)
+
+        case _ ⇒
+            false
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index 4634a97..6502c0f 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -19,12 +19,15 @@ package org.apache.ignite.spark
 
 import org.apache.commons.lang.StringUtils.equalsIgnoreCase
 import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition}
-import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.cache.{CacheMode, QueryEntity}
+import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.ignite.internal.util.lang.GridFunc.contains
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
 
 package object impl {
     /**
@@ -129,4 +132,47 @@ package object impl {
       */
     def isKeyColumn(table: QueryEntity, column: String): Boolean =
         contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column)
+
+    /**
+      * Computes spark partitions for a given cache.
+      *
+      * @param ic Ignite context.
+      * @param cacheName Cache name
+      * @return Array of IgniteDataFramPartition
+      */
+    def calcPartitions(ic: IgniteContext, cacheName: String): Array[Partition] = {
+        val cache = ic.ignite().cache[Any, Any](cacheName)
+
+        val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]])
+
+        if (ccfg.getCacheMode == CacheMode.REPLICATED) {
+            val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes()
+
+            Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList))
+        }
+        else {
+            val aff = ic.ignite().affinity(cacheName)
+
+            val parts = aff.partitions()
+
+            val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) {
+                case (nodeToParts, ignitePartIdx) ⇒
+                    val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head
+
+                    if (nodeToParts.contains(primary)) {
+                        nodeToParts(primary) += ignitePartIdx
+
+                        nodeToParts
+                    }
+                    else
+                        nodeToParts + (primary → ArrayBuffer[Int](ignitePartIdx))
+            }
+
+            val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) ⇒
+                IgniteDataFramePartition(i, node, nodesParts.toList)
+            }
+
+            partitions.toArray
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
new file mode 100644
index 0000000..b23cd6f
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.{IgniteSQLAccumulatorRelation, IgniteSQLRelation, sqlCacheName}
+import org.apache.ignite.spark.impl.optimization.{accumulator, _}
+import org.apache.ignite.spark.impl.optimization.accumulator._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+/**
+  * Query plan optimization for a Ignite based queries.
+  */
+object IgniteOptimization extends Rule[LogicalPlan] with Logging {
+    /** @inheritdoc */
+    override def apply(plan: LogicalPlan): LogicalPlan = {
+        logDebug("")
+        logDebug("== Plan Before Ignite Operator Push Down ==")
+        logDebug(plan.toString())
+
+        val transformed = fixAmbiguousOutput(pushDownOperators(plan))
+
+        logDebug("")
+        logDebug("== Plan After Ignite Operator Push Down ==")
+        logDebug(transformed.toString())
+
+        makeIgniteAccRelation(transformed)
+    }
+
+    /**
+      * Change query plan by accumulating query parts supported by Ignite into `QueryAccumulator`.
+      *
+      * @param plan Query plan.
+      * @return Transformed plan.
+      */
+    private def pushDownOperators(plan: LogicalPlan): LogicalPlan = {
+        val aliasIndexIterator = Stream.from(1).iterator
+
+        //Flag to indicate that some step was skipped due to unsupported expression.
+        //When it true we has to skip entire transformation of higher level Nodes.
+        var stepSkipped = true
+
+        //Applying optimization rules from bottom to up tree nodes.
+        plan.transformUp {
+            //We found basic node to transform.
+            //We create new accumulator and going to the upper layers.
+            case LogicalRelation(igniteSqlRelation: IgniteSQLRelation[_, _], output, _catalogTable) ⇒
+                //Clear flag to optimize each statement separately
+                stepSkipped = false
+
+                val igniteQueryContext = IgniteQueryContext(
+                    igniteContext = igniteSqlRelation.ic,
+                    sqlContext = igniteSqlRelation.sqlContext,
+                    catalogTable = _catalogTable,
+                    aliasIndex = aliasIndexIterator,
+                    cacheName =
+                        sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName)
+                            .getOrElse(throw new IgniteException("Unknown table")))
+
+                //Logical Relation is bottomest TreeNode in LogicalPlan.
+                //We replace it with accumulator.
+                //We push all supported SQL operator into it on the higher tree levels.
+                SingleTableSQLAccumulator(
+                    igniteQueryContext = igniteQueryContext,
+                    table = Some(igniteSqlRelation.tableName),
+                    tableExpression = None,
+                    outputExpressions = output.map(attr ⇒ attr.withQualifier(Some(igniteSqlRelation.tableName))))
+
+            case project: Project if !stepSkipped && exprsAllowed(project.projectList) ⇒
+                //Project layer just changes output of current query.
+                project.child match {
+                    case acc: SelectAccumulator ⇒
+                        acc.withOutputExpressions(
+                            substituteExpressions(project.projectList, acc.outputExpressions))
+
+                    case _ ⇒
+                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+                }
+
+            case sort: Sort if !stepSkipped && isSortPushDownAllowed(sort.order, sort.global) ⇒
+                sort.child match {
+                    case acc: QueryAccumulator ⇒
+                        acc.withOrderBy(sort.order)
+
+                    case _ ⇒
+                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+                }
+
+            case filter: Filter if !stepSkipped && exprsAllowed(filter.condition) ⇒
+
+                filter.child match {
+                    case acc: SelectAccumulator ⇒
+                        if (hasAggregateInside(filter.condition) || acc.groupBy.isDefined)
+                            acc.withHaving(acc.having.getOrElse(Nil) :+ filter.condition)
+                        else
+                            acc.withWhere(acc.where.getOrElse(Nil) :+ filter.condition)
+
+                    case _ ⇒
+                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+                }
+
+            case agg: Aggregate
+                if !stepSkipped && exprsAllowed(agg.groupingExpressions) && exprsAllowed(agg.aggregateExpressions) ⇒
+
+                agg.child match {
+                    case acc: SelectAccumulator ⇒
+                        if (acc.groupBy.isDefined) {
+                            val tableAlias = acc.igniteQueryContext.uniqueTableAlias
+
+                            accumulator.SingleTableSQLAccumulator(
+                                igniteQueryContext = acc.igniteQueryContext,
+                                table = None,
+                                tableExpression = Some((acc, tableAlias)),
+                                outputExpressions = agg.aggregateExpressions)
+                        }
+                        else
+                            acc
+                                .withGroupBy(agg.groupingExpressions)
+                                .withOutputExpressions(
+                                    substituteExpressions(agg.aggregateExpressions, acc.outputExpressions))
+
+                    case acc: QueryAccumulator ⇒
+                        val tableAlias = acc.igniteQueryContext.uniqueTableAlias
+
+                        accumulator.SingleTableSQLAccumulator(
+                            igniteQueryContext = acc.igniteQueryContext,
+                            table = None,
+                            tableExpression = Some((acc, tableAlias)),
+                            outputExpressions = agg.aggregateExpressions)
+
+                    case _ ⇒
+                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+                }
+
+            case limit: LocalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
+                limit.child match {
+                    case acc: SelectAccumulator ⇒
+                        acc.withLocalLimit(limit.limitExpr)
+
+                    case _ ⇒
+                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+                }
+
+            case limit: GlobalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
+                limit.child.transformUp {
+                    case acc: SelectAccumulator ⇒
+                        acc.withLimit(limit.limitExpr)
+
+                    case _ ⇒
+                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+                }
+
+            case union: Union if !stepSkipped && isAllChildrenOptimized(union.children) ⇒
+                val first = union.children.head.asInstanceOf[QueryAccumulator]
+
+                val subQueries = union.children.map(_.asInstanceOf[QueryAccumulator])
+
+                UnionSQLAccumulator(
+                    first.igniteQueryContext,
+                    subQueries,
+                    subQueries.head.output)
+
+            case join: Join
+                if !stepSkipped && isAllChildrenOptimized(Seq(join.left, join.right)) &&
+                    join.condition.forall(exprsAllowed) ⇒
+
+                val left = join.left.asInstanceOf[QueryAccumulator]
+
+                val (leftOutput, leftAlias) =
+                    if (!isSimpleTableAcc(left)) {
+                        val tableAlias = left.igniteQueryContext.uniqueTableAlias
+
+                        (left.output, Some(tableAlias))
+                    }
+                    else
+                        (left.output, None)
+
+                val right = join.right.asInstanceOf[QueryAccumulator]
+
+                val (rightOutput, rightAlias) =
+                    if (!isSimpleTableAcc(right) ||
+                        leftAlias.getOrElse(left.qualifier) == right.qualifier) {
+                        val tableAlias = right.igniteQueryContext.uniqueTableAlias
+
+                        (right.output, Some(tableAlias))
+                    }
+                    else
+                        (right.output, None)
+
+                JoinSQLAccumulator(
+                    left.igniteQueryContext,
+                    left,
+                    right,
+                    join.joinType,
+                    leftOutput ++ rightOutput,
+                    join.condition,
+                    leftAlias,
+                    rightAlias)
+
+            case unknown ⇒
+                stepSkipped = true
+
+                unknown
+        }
+    }
+
+    /**
+      * Changes qualifiers for an ambiguous columns names.
+      *
+      * @param plan Query plan.
+      * @return Transformed plan.
+      */
+    private def fixAmbiguousOutput(plan: LogicalPlan): LogicalPlan = plan.transformDown {
+        case acc: SingleTableSQLAccumulator if acc.children.exists(_.isInstanceOf[JoinSQLAccumulator]) ⇒
+            val fixedChildOutput =
+                fixAmbiguousOutput(acc.tableExpression.get._1.outputExpressions, acc.igniteQueryContext)
+
+            val newOutput = substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)
+
+            acc.copy(
+                outputExpressions = newOutput,
+                where = acc.where.map(
+                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                groupBy = acc.groupBy.map(
+                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                having = acc.having.map(
+                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                limit = acc.limit.map(
+                    substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                localLimit = acc.localLimit.map(
+                    substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                orderBy = acc.orderBy.map(
+                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
+
+            acc
+
+        case acc: JoinSQLAccumulator
+            if acc.left.isInstanceOf[JoinSQLAccumulator] || acc.right.isInstanceOf[JoinSQLAccumulator] ⇒
+            val leftFixed = acc.left match {
+                case leftJoin: JoinSQLAccumulator ⇒
+                    val fixedChildOutput = fixAmbiguousOutput(acc.left.outputExpressions, acc.igniteQueryContext)
+
+                    val newOutput =
+                        substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)
+
+                    acc.copy(
+                        outputExpressions = newOutput,
+                        left = leftJoin.copy(outputExpressions = fixedChildOutput),
+                        condition = acc.condition.map(
+                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                        where = acc.where.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                        groupBy = acc.groupBy.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                        having = acc.having.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                        limit = acc.limit.map(
+                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                        localLimit = acc.localLimit.map(
+                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                        orderBy = acc.orderBy.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
+
+                case _ ⇒ acc
+            }
+
+            val fixed = leftFixed.right match {
+                case rightJoin: JoinSQLAccumulator ⇒
+                    val fixedChildOutput =
+                        fixAmbiguousOutput(leftFixed.outputExpressions, leftFixed.igniteQueryContext)
+
+                    val newOutput = substituteExpressions(leftFixed.outputExpressions, fixedChildOutput)
+
+                    leftFixed.copy(
+                        outputExpressions = newOutput,
+                        right = rightJoin.copy(outputExpressions = fixedChildOutput),
+                        condition = acc.condition.map(
+                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                        where = acc.where.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                        groupBy = acc.groupBy.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                        having = acc.having.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+                        limit = acc.limit.map(
+                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                        localLimit = acc.localLimit.map(
+                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+                        orderBy = acc.orderBy.map(
+                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
+
+                case _ ⇒ leftFixed
+            }
+
+            fixed.copy(
+                condition = acc.condition.map(
+                    substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
+                where = acc.where.map(
+                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
+                groupBy = acc.groupBy.map(
+                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
+                having = acc.having.map(
+                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
+                limit = acc.limit.map(
+                    substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
+                localLimit = acc.localLimit.map(
+                    substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
+                orderBy = acc.orderBy.map(
+                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)))
+
+        case unknown ⇒
+            unknown
+    }
+
+    private def fixAmbiguousOutput(exprs: Seq[NamedExpression], ctx: IgniteQueryContext): Seq[NamedExpression] =
+        exprs.foldLeft((Set[String](), Set[NamedExpression]())) {
+            case ((uniqueNames, fixed), next) ⇒
+                if (uniqueNames(next.name))
+                    (uniqueNames, fixed + Alias(next, ctx.uniqueColumnAlias(next))(exprId = next.exprId))
+                else
+                    (uniqueNames + next.name, fixed + next)
+        }._2.toSeq
+
+    /**
+      * Substitutes each `QueryAccumulator` with a `LogicalRelation` contains `IgniteSQLAccumulatorRelation`.
+      *
+      * @param plan Query plan.
+      * @return Transformed plan.
+      */
+    private def makeIgniteAccRelation(plan: LogicalPlan): LogicalPlan =
+        plan.transformDown {
+            case acc: QueryAccumulator ⇒
+                new LogicalRelation (
+                    relation = IgniteSQLAccumulatorRelation(acc),
+                    output = acc.outputExpressions.map(toAttributeReference(_, Seq.empty)),
+                    catalogTable = acc.igniteQueryContext.catalogTable)
+        }
+
+    /**
+      * @param order Order.
+      * @param global True if order applied to entire result set false if ordering per-partition.
+      * @return True if sort can be pushed down to Ignite, false otherwise.
+      */
+    private def isSortPushDownAllowed(order: Seq[SortOrder], global: Boolean): Boolean =
+        global && order.map(_.child).forall(exprsAllowed)
+
+    /**
+      * @param children Plans to check.
+      * @return True is all plan are `QueryAccumulator`, false otherwise.
+      */
+    private def isAllChildrenOptimized(children: Seq[LogicalPlan]): Boolean =
+        children.forall {
+            case _: QueryAccumulator ⇒
+                true
+
+            case _ ⇒
+                false
+        }
+
+    /**
+      * Changes expression from `exprs` collection to expression with same `exprId` from `substitution`.
+      *
+      * @param exprs Expressions to substitute.
+      * @param substitution Expressions for substitution
+      * @param changeOnlyName If true substitute only expression name.
+      * @tparam T Concrete expression type.
+      * @return Substituted expressions.
+      */
+    private def substituteExpressions[T <: Expression](exprs: Seq[T], substitution: Seq[NamedExpression],
+        changeOnlyName: Boolean = false): Seq[T] = {
+
+        exprs.map(substituteExpression(_, substitution, changeOnlyName))
+    }
+
+    private def substituteExpression[T <: Expression](expr: T, substitution: Seq[NamedExpression],
+        changeOnlyName: Boolean): T = expr match {
+        case ne: NamedExpression ⇒
+            substitution.find(_.exprId == ne.exprId) match {
+                case Some(found) ⇒
+                    if (!changeOnlyName)
+                        found.asInstanceOf[T]
+                    else ne match {
+                        case alias: Alias ⇒
+                            Alias(
+                                AttributeReference(
+                                    found.name,
+                                    found.dataType,
+                                    nullable = found.nullable,
+                                    metadata = found.metadata)(
+                                    exprId = found.exprId,
+                                    qualifier = found.qualifier,
+                                    isGenerated = found.isGenerated),
+                                alias.name) (
+                                exprId = alias.exprId,
+                                qualifier = alias.qualifier,
+                                explicitMetadata = alias.explicitMetadata,
+                                isGenerated = alias.isGenerated).asInstanceOf[T]
+
+                        case attr: AttributeReference ⇒
+                            attr.copy(name = found.name)(
+                                exprId = found.exprId,
+                                qualifier = found.qualifier,
+                                isGenerated = found.isGenerated).asInstanceOf[T]
+
+                        case _ ⇒ ne.asInstanceOf[T]
+                    }
+
+                case None ⇒
+                    expr.withNewChildren(
+                        substituteExpressions(expr.children, substitution, changeOnlyName)).asInstanceOf[T]
+            }
+
+        case _ ⇒
+            expr.withNewChildren(
+                substituteExpressions(expr.children, substitution, changeOnlyName)).asInstanceOf[T]
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
index 8860590..1fccc3a 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
@@ -66,8 +66,14 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) extends
         new IgniteSharedState(ic, sparkContext)
 
     /** @inheritdoc */
-    @transient override lazy val sessionState: SessionState =
-        new SessionStateBuilder(self, None).build()
+    @transient override lazy val sessionState: SessionState = {
+        val sessionState = new SessionStateBuilder(self, None).build()
+
+        sessionState.experimentalMethods.extraOptimizations =
+            sessionState.experimentalMethods.extraOptimizations :+ IgniteOptimization
+
+        sessionState
+    }
 
     /** @inheritdoc */
     @transient override lazy val conf: RuntimeConfig = proxy.conf

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
index 8613592..29a4e6f 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
@@ -19,13 +19,16 @@ package org.apache.ignite.spark
 
 import org.apache.ignite.{Ignite, Ignition}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
-import org.apache.spark.sql.SparkSession
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpec, Matchers}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.scalatest._
 import java.lang.{Long ⇒ JLong}
 
 import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.cache.query.annotations.QuerySqlField
-import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.IgnitionEx.loadConfiguration
+import org.apache.ignite.spark.AbstractDataFrameSpec.configuration
+import org.apache.ignite.spark.impl.IgniteSQLAccumulatorRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.ignite.spark.AbstractDataFrameSpec._
 
 import scala.annotation.meta.field
@@ -33,12 +36,13 @@ import scala.reflect.ClassTag
 
 /**
   */
-abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter {
+abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter
+    with Assertions {
     var spark: SparkSession = _
 
     var client: Ignite = _
 
-    override protected def beforeAll() = {
+    override protected def beforeAll(): Unit = {
         for (i ← 0 to 3)
             Ignition.start(configuration("grid-" + i, client = false))
 
@@ -47,7 +51,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
         createSparkSession()
     }
 
-    override protected def afterAll() = {
+    override protected def afterAll(): Unit = {
         Ignition.stop("client", false)
 
         for (i ← 0 to 3)
@@ -108,6 +112,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
         cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll
         cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll
         cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], "St. Petersburg")).getAll
     }
 
     def createEmployeeCache(client: Ignite, cacheName: String): Unit = {
@@ -119,6 +124,31 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
         cache.put("key2", Employee(2, "Sarah Connor", 32, 10000))
         cache.put("key3", Employee(3, "Arnold Schwarzenegger", 27, 1000))
     }
+
+    def checkQueryData[T](res: DataFrame, expectedRes: Product)
+        (implicit ord: T ⇒ Ordered[T]): Unit =
+        checkQueryData(res, expectedRes, _.getAs[T](0))
+
+    def checkQueryData[Ordered](res: DataFrame, expectedRes: Product, sorter: Row => Ordered)
+        (implicit ord: Ordering[Ordered]): Unit = {
+        val data = res.rdd.collect.sortBy(sorter)
+
+        for(i ← 0 until expectedRes.productArity) {
+            val row = data(i)
+
+            if (row.size == 1)
+                assert(row(0) == expectedRes.productElement(i), s"row[$i, 0] = ${row(0)} should be equal ${expectedRes.productElement(i)}")
+            else {
+                val expectedRow: Product = expectedRes.productElement(i).asInstanceOf[Product]
+
+                assert(expectedRow.productArity == row.size, s"Rows size should be equal, but expected.size=${expectedRow.productArity} " +
+                    s"and row.size=${row.size}")
+
+                for (j ← 0 until expectedRow.productArity)
+                    assert(row(j) == expectedRow.productElement(j), s"row[$i, $j] = ${row(j)} should be equal ${expectedRow.productElement(j)}")
+            }
+        }
+    }
 }
 
 object AbstractDataFrameSpec {
@@ -135,7 +165,7 @@ object AbstractDataFrameSpec {
     val PERSON_TBL_NAME_2 = "person2"
 
     def configuration(igniteInstanceName: String, client: Boolean): IgniteConfiguration = {
-        val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+        val cfg = loadConfiguration(TEST_CONFIG_FILE).get1()
 
         cfg.setClientMode(client)
 
@@ -167,6 +197,30 @@ object AbstractDataFrameSpec {
     }
 
     /**
+      * @param df Data frame.
+      * @param qry SQL Query.
+      */
+    def checkOptimizationResult(df: DataFrame, qry: String = ""): Unit = {
+        df.explain(true)
+
+        val plan = df.queryExecution.optimizedPlan
+
+        val cnt = plan.collectLeaves.count {
+            case LogicalRelation(relation: IgniteSQLAccumulatorRelation[_, _], _, _) ⇒
+                if (qry != "")
+                    assert(qry.toLowerCase == relation.acc.compileQuery().toLowerCase,
+                        s"Generated query should be equal to expected.\nexpected  - $qry\ngenerated - ${relation.acc.compileQuery()}")
+
+                true
+
+            case _ ⇒
+                false
+        }
+
+        assert(cnt != 0, s"Plan should contains IgniteSQLAccumulatorRelation")
+    }
+
+    /**
       * Enclose some closure, so it doesn't on outer object(default scala behaviour) while serializing.
       */
     def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
index 6077211..d87d234 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
@@ -58,7 +58,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
         it("Should provide ability to query SQL table without explicit registration") {
             val res = igniteSession.sql("SELECT id, name FROM city").rdd
 
-            res.count should equal(3)
+            res.count should equal(4)
 
             val cities = res.collect.sortBy(_.getAs[JLong]("id"))
 
@@ -66,7 +66,8 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
                 Array(
                     (1, "Forest Hill"),
                     (2, "Denver"),
-                    (3, "St. Petersburg")
+                    (3, "St. Petersburg"),
+                    (4, "St. Petersburg")
                 )
             )
         }
@@ -136,7 +137,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
 
         createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
 
-        val configProvider = enclose(null) (x ⇒ () ⇒ {
+        val configProvider = enclose(null) (_ ⇒ () ⇒ {
             val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
 
             cfg.setClientMode(true)

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
index cdd26cd..c5df901 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.spark
 
-import java.lang.{Integer ⇒ JInteger, String ⇒ JString}
-
-import org.apache.ignite.Ignite
 import org.apache.ignite.spark.AbstractDataFrameSpec._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types._
@@ -45,7 +42,7 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
                     ("IS_RESIDENT", BooleanType, true),
                     ("SALARY", DoubleType, true),
                     ("PENSION", DoubleType, true),
-                    ("ACCOUNT", DecimalType(10, 0), true),
+                    ("ACCOUNT", IgniteRDD.DECIMAL, true),
                     ("AGE", IntegerType, true),
                     ("ID", LongType, false),
                     ("CITY_ID", LongType, false))

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
index 2ceb44a..b3f7026 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
@@ -28,5 +28,12 @@ class IgniteDataFrameSuite extends Suites (
     new IgniteSQLDataFrameWriteSpec,
     new IgniteSQLDataFrameIgniteSessionWriteSpec,
     new IgniteDataFrameWrongConfigSpec,
-    new IgniteCatalogSpec
+    new IgniteCatalogSpec,
+    new IgniteOptimizationSpec,
+    new IgniteOptimizationStringFuncSpec,
+    new IgniteOptimizationMathFuncSpec,
+    new IgniteOptimizationAggregationFuncSpec,
+    new IgniteOptimizationSystemFuncSpec,
+    new IgniteOptimizationJoinSpec,
+    new IgniteOptimizationDisableEnableSpec
 )

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala
new file mode 100644
index 0000000..d2527c8
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Double ⇒ JDouble, Long ⇒ JLong}
+
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationAggregationFuncSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Supported optimized aggregation functions") {
+        it("COUNT") {
+            val df = igniteSession.sql("SELECT count(*) FROM numbers")
+
+            checkOptimizationResult(df, "SELECT count(1) FROM numbers")
+
+            val data = Tuple1(21)
+
+            checkQueryData(df, data)
+        }
+
+        it("AVG - DECIMAL") {
+            //TODO: write me
+        }
+
+        it("AVG - DOUBLE") {
+            val df = igniteSession.sql("SELECT AVG(val) FROM numbers WHERE id <= 3")
+
+            checkOptimizationResult(df, "SELECT AVG(val) FROM numbers WHERE id IS NOT NULL and id <= 3")
+
+            val data = Tuple1(.5)
+
+            checkQueryData(df, data)
+        }
+
+        it("MIN - DOUBLE") {
+            val df = igniteSession.sql("SELECT MIN(val) FROM numbers")
+
+            checkOptimizationResult(df, "SELECT MIN(val) FROM numbers")
+
+            val data = Tuple1(-1.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("MAX - DOUBLE") {
+            val df = igniteSession.sql("SELECT MAX(val) FROM numbers")
+
+            checkOptimizationResult(df, "SELECT MAX(val) FROM numbers")
+
+            val data = Tuple1(180.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("SUM - DOUBLE") {
+            val df = igniteSession.sql("SELECT SUM(val) FROM numbers WHERE id <= 3")
+
+            checkOptimizationResult(df, "SELECT SUM(val) FROM numbers WHERE id IS NOT NULL and id <= 3")
+
+            val data = Tuple1(1.5)
+
+            checkQueryData(df, data)
+        }
+
+        it("SUM - DECIMAL - 1") {
+            val df = igniteSession.sql("SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20)")
+
+            checkOptimizationResult(df, "SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20)")
+
+            df.printSchema()
+
+            val data = Tuple1(new java.math.BigDecimal(10.5).setScale(3))
+
+            checkQueryData(df, data)
+        }
+
+        it("SUM - DECIMAL - 2") {
+            val df = igniteSession.sql("SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20, 21)")
+
+            checkOptimizationResult(df, "SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20, 21)")
+
+            val data = Tuple1(new java.math.BigDecimal(15).setScale(3))
+
+            checkQueryData(df, data)
+        }
+
+        it("SUM - LONG") {
+            val df = igniteSession.sql("SELECT SUM(int_val) FROM numbers WHERE id in (15, 16, 17)")
+
+            checkOptimizationResult(df, "SELECT CAST(SUM(int_val) AS BIGINT) as \"SUM(int_val)\" " +
+                "FROM numbers WHERE id in (15, 16, 17)")
+
+            val data = Tuple1(6L)
+
+            checkQueryData(df, data)
+        }
+    }
+
+    def createNumberTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE numbers (
+              |    id LONG,
+              |    val DOUBLE,
+              |    int_val LONG,
+              |    decimal_val DECIMAL(5, 5),
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        var qry = new SqlFieldsQuery("INSERT INTO numbers (id, val) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], .0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], .5.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], 1.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], 2.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(5L.asInstanceOf[JLong], 4.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(6L.asInstanceOf[JLong], -0.5.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(7L.asInstanceOf[JLong], -1.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(8L.asInstanceOf[JLong], 42.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(9L.asInstanceOf[JLong], .51.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(10L.asInstanceOf[JLong], .49.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(11L.asInstanceOf[JLong], 100.0.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(12L.asInstanceOf[JLong], (Math.E*Math.E).asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(13L.asInstanceOf[JLong], Math.PI.asInstanceOf[JDouble])).getAll
+        cache.query(qry.setArgs(14L.asInstanceOf[JLong], 180.0.asInstanceOf[JDouble])).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val) values (?, ?)")
+
+        cache.query(qry.setArgs(15L.asInstanceOf[JLong], 1L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(16L.asInstanceOf[JLong], 2L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(17L.asInstanceOf[JLong], 3L.asInstanceOf[JLong])).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO numbers (id, decimal_val) values (?, ?)")
+
+        cache.query(qry.setArgs(18L.asInstanceOf[JLong], new java.math.BigDecimal(2.5))).getAll
+        cache.query(qry.setArgs(19L.asInstanceOf[JLong], new java.math.BigDecimal(3.5))).getAll
+        cache.query(qry.setArgs(20L.asInstanceOf[JLong], new java.math.BigDecimal(4.5))).getAll
+        cache.query(qry.setArgs(21L.asInstanceOf[JLong], new java.math.BigDecimal(4.5))).getAll
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createNumberTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala
new file mode 100644
index 0000000..7912cd0
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Long ⇒ JLong}
+import java.util.{Date ⇒ JDate}
+import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit.DAYS
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationDateFuncSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    val format = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss")
+
+    describe("Supported optimized date functions") {
+        it(" - CURRENT_TIMESTAMP") {
+            val df = igniteSession.sql("SELECT id, CURRENT_TIMESTAMP() FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = df.rdd.collect
+
+            assert(data(0).getAs[JLong]("id") == 1L)
+
+            val date: JDate = data(0).getAs[JDate]("current_timestamp()")
+            val millisDiff = new JDate().getTime - date.getTime
+
+            assert(millisDiff <= 30000)
+        }
+
+        it(" - CURRENT_DATE") {
+            val df = igniteSession.sql("SELECT id, CURRENT_DATE() FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = df.rdd.collect
+
+            assert(data(0).getAs[JLong]("id") == 1L)
+
+            val date: JDate = data(0).getAs[JDate]("current_date()")
+            val dayDiff = DAYS.convert(new JDate().getTime - date.getTime, TimeUnit.MILLISECONDS)
+
+            assert(dayDiff <= 1)
+        }
+
+        it(" - HOUR") {
+            val df = igniteSession.sql("SELECT HOUR(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(0)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - MINUTE") {
+            val df = igniteSession.sql("SELECT MINUTE(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(0)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - SECOND") {
+            val df = igniteSession.sql("SELECT SECOND(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(0)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - MONTH") {
+            val df = igniteSession.sql("SELECT MONTH(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(0)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - YEAR") {
+            val df = igniteSession.sql("SELECT YEAR(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(2017)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - QUARTER") {
+            val df = igniteSession.sql("SELECT QUARTER(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - WEEK") {
+            val df = igniteSession.sql("SELECT WEEKOFYEAR(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - DAY_OF_MONTH") {
+            val df = igniteSession.sql("SELECT DAYOFMONTH(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - DAY_OF_YEAR") {
+            val df = igniteSession.sql("SELECT DAYOFYEAR(val) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - DATE_ADD") {
+            val df = igniteSession.sql("SELECT DATE_ADD(val, 2) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(format.parse("03.01.2017 00:00:00"))
+
+            checkQueryData(df, data)
+        }
+
+        it(" - DATEDIFF") {
+            val df = igniteSession.sql("SELECT " +
+                "DATEDIFF(val, TO_DATE('2017-01-02 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS')) FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1(1)
+
+            checkQueryData(df, data)
+        }
+
+        it(" - FORMATDATETIME") {
+            val df = igniteSession.sql("SELECT DATE_FORMAT(val, 'yyyy-MM-dd HH:mm:ss.SSS') FROM dates WHERE id = 1")
+
+            checkOptimizationResult(df)
+
+            val data = Tuple1("2017-01-01 00:00:00.000")
+
+            checkQueryData(df, data)
+        }
+    }
+
+    def createDateTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE dates (
+              |    id LONG,
+              |    val DATE,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        val qry = new SqlFieldsQuery("INSERT INTO dates(id, val) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], format.parse("01.01.2017 00:00:00"))).getAll
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createDateTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala
new file mode 100644
index 0000000..033af74
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.ignite.IgniteOptimization
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationDisableEnableSpec extends AbstractDataFrameSpec {
+    var personDataFrame: DataFrame = _
+
+    describe("Ignite Optimization Disabling/Enabling") {
+        it("should add Ignite Optimization to a session on a first query") {
+            if (spark.sparkContext.isStopped)
+                createSparkSession()
+
+            assert(!igniteOptimizationExists(spark), "Session shouldn't contains IgniteOptimization")
+
+            personDataFrame = spark.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "person")
+                .load()
+
+            assert(igniteOptimizationExists(spark),
+                "Session should contains IgniteOptimization after executing query over Ignite Data Frame")
+
+            spark.stop()
+        }
+
+        it("should remove Ignite Optimization if it disabled at runtime") {
+            if (!spark.sparkContext.isStopped)
+                spark.stop()
+
+            val newSession = SparkSession.builder()
+                .appName("Ignite Optimization check")
+                .master("local")
+                .config("spark.executor.instances", "2")
+                .getOrCreate()
+
+            assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization")
+
+            var newPersonDataFrame = newSession.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "person")
+                .load()
+
+            assert(igniteOptimizationExists(newSession),
+                "Session should contains IgniteOptimization after executing query over Ignite Data Frame")
+
+
+            newSession.conf.set(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "true")
+
+            newPersonDataFrame = newSession.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "person")
+                .load()
+
+            assert(!igniteOptimizationExists(newSession),
+                "Session shouldn't contains IgniteOptimization")
+
+            newSession.close()
+        }
+
+        it("shouldn't add Ignite Optimization to a session when it's disabled") {
+            if (!spark.sparkContext.isStopped)
+                spark.stop()
+
+            val newSession = SparkSession.builder()
+                .appName("Ignite Optimization check")
+                .master("local")
+                .config("spark.executor.instances", "2")
+                .config(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "true")
+                .getOrCreate()
+
+            assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization")
+
+            val newPersonDataFrame = newSession.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "person")
+                .load()
+
+            newPersonDataFrame.createOrReplaceTempView("person")
+
+            val res = newSession.sqlContext.sql("SELECT name FROM person WHERE id = 2").rdd
+
+            res.count should equal(1)
+
+            assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization")
+
+            newSession.close()
+        }
+    }
+
+    def igniteOptimizationExists(session: SparkSession): Boolean =
+        session.sessionState.experimentalMethods.extraOptimizations.contains(IgniteOptimization)
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createPersonTable(client, "cache1")
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala
new file mode 100644
index 0000000..b4b36a8
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+import java.lang.{Long ⇒ JLong}
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationJoinSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Optimized join queries") {
+        it("UNION") {
+            val qry =
+                """
+                  | SELECT id, val1 as val FROM jt1 UNION
+                  | SELECT id, val2 as val FROM jt2 UNION
+                  | SELECT id, val3 as val FROM jt3
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT id, val FROM (SELECT id, val1 as val FROM jt1 UNION " +
+                    "SELECT id, val2 as val FROM jt2 UNION " +
+                    "SELECT id, val3 as val FROM jt3) table1")
+
+            val data = (
+                (1L, "A"),
+                (1L, "B"),
+                (2L, "B"),
+                (2L, "C"),
+                (2L, "D"),
+                (3L, "C"),
+                (3L, "D"),
+                (3L, "E"))
+
+            checkQueryData(df, data, row ⇒ (row.getAs[JLong](0), row.getAs[String](1)))
+        }
+
+        it("UNION ALL") {
+            val qry =
+                """
+                  | SELECT id, val1 as val FROM jt1 UNION ALL
+                  | SELECT id, val2 as val FROM jt2 UNION ALL
+                  | SELECT id, val3 as val FROM jt3
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT id, val1 as val FROM jt1 UNION " +
+                    "SELECT id, val2 as val FROM jt2 UNION " +
+                    "SELECT id, val3 as val FROM jt3")
+
+            val data = (
+                (1L, "A"),
+                (1L, "B"),
+                (2L, "B"),
+                (2L, "C"),
+                (2L, "D"),
+                (3L, "C"),
+                (3L, "D"),
+                (3L, "E"))
+
+            checkQueryData(df, data, row ⇒ (row.getAs[JLong](0), row.getAs[String](1)))
+        }
+
+        it("UNION ALL ORDER") {
+            val qry =
+                """
+                  | SELECT id, val1 as val FROM jt1 UNION ALL
+                  | SELECT id, val2 as val FROM jt2 UNION ALL
+                  | SELECT id, val3 as val FROM jt3
+                  | ORDER BY id DESC, val
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT id, val1 as val FROM jt1 UNION " +
+                    "SELECT id, val2 as val FROM jt2 UNION " +
+                    "SELECT id, val3 as val FROM jt3 " +
+                    "ORDER BY id DESC, val")
+
+            val data = (
+                (3L, "C"),
+                (3L, "D"),
+                (3L, "E"),
+                (2L, "B"),
+                (2L, "C"),
+                (2L, "D"),
+                (1L, "A"),
+                (1L, "B")
+            )
+
+            checkQueryData(df, data, _ ⇒ 0)
+        }
+
+        it("UNION WITH AGGREGATE") {
+            val qry =
+                """
+                  | SELECT VAL, COUNT(*) FROM (
+                  |     SELECT id, val1 as val FROM jt1 UNION
+                  |     SELECT id, val2 as val FROM jt2 UNION
+                  |     SELECT id, val3 as val FROM jt3 ) t1
+                  | GROUP BY val HAVING COUNT(*) > 1
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT VAL, count(1) FROM (" +
+                    "SELECT id, val1 AS val FROM JT1 UNION " +
+                    "SELECT id, val2 AS val FROM JT2 UNION " +
+                    "SELECT id, val3 AS val FROM JT3" +
+                ") table1 GROUP BY val HAVING count(1) > 1")
+
+            val data = (
+                ("B", 2L),
+                ("C", 2L),
+                ("D", 2L)
+            )
+
+            checkQueryData(df, data)
+        }
+
+        it("AGGREGATE ON AGGREGATE RESULT") {
+            val qry =
+                """
+                  | SELECT SUM(cnt) FROM (
+                  |     SELECT VAL, COUNT(*) as CNT FROM (
+                  |         SELECT id, val1 as val FROM jt1 UNION
+                  |         SELECT id, val2 as val FROM jt2 UNION
+                  |         SELECT id, val3 as val FROM jt3 ) t1
+                  |     GROUP BY val HAVING COUNT(*) > 1
+                  | ) t1
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT CAST(SUM(cnt) as BIGINT) as \"SUM(cnt)\" FROM (" +
+                    "SELECT count(1) as cnt FROM (" +
+                        "SELECT id, val1 as val FROM jt1 UNION " +
+                        "SELECT id, val2 as val FROM jt2 UNION " +
+                        "SELECT id, val3 as val FROM jt3" +
+                    ") table1 GROUP BY val HAVING count(1) > 1) table2")
+
+            val data = Tuple1(6.0)
+
+            checkQueryData(df, data)
+        }
+
+        it("SELF INNER JOIN") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id,
+                  | jt1.val1,
+                  | jt2.id,
+                  | jt2.val1
+                  |FROM
+                  |     jt1 JOIN
+                  |     jt1 as jt2 ON jt1.val1 = jt2.val1
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df, "SELECT JT1.ID, JT1.VAL1, table1.ID, table1.VAL1 " +
+                "FROM JT1 JOIN JT1 AS table1 ON jt1.val1 = table1.val1 " +
+                "WHERE jt1.val1 IS NOT NULL AND table1.val1 IS NOT NULL")
+
+            val data = (
+                (1, "A", 1, "A"),
+                (2, "B", 2, "B"),
+                (3, "C", 3, "C")
+            )
+
+            checkQueryData(df, data)
+        }
+
+
+        it("SELF INNER JOIN WITH WHERE") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id,
+                  | jt1.val1,
+                  | jt2.id,
+                  | jt2.val1
+                  |FROM
+                  |     jt1 JOIN
+                  |     jt1 as jt2 ON jt1.val1 = jt2.val1
+                  |WHERE jt2.val1 = 'A'
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+/*            checkOptimizationResult(df, "SELECT JT1.ID, JT1.VAL1, table1.ID, table1.VAL1 " +
+                "FROM JT1 JOIN JT1 as table1 ON JT1.val1 = table1.val1 " +
+                "WHERE JT1.val1 = 'A' AND JT1.val1 IS NOT NULL AND table1.val1 IS NOT NULL AND table1.val1 = 'A'")*/
+
+            val data = Tuple1(
+                (1, "A", 1, "A")
+            )
+
+            checkQueryData(df, data)
+        }
+
+
+        it("INNER JOIN") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id as id1,
+                  | jt1.val1,
+                  | jt2.id as id2,
+                  | jt2.val2
+                  |FROM
+                  |     jt1 JOIN
+                  |     jt2 ON jt1.val1 = jt2.val2
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df, "SELECT JT1.ID AS id1, JT1.VAL1, JT2.ID AS id2, JT2.VAL2 " +
+                "FROM JT1 JOIN JT2 ON jt1.val1 = jt2.val2 " +
+                "WHERE jt1.val1 IS NOT NULL AND jt2.val2 IS NOT NULL")
+
+            val data = (
+                (2, "B", 1, "B"),
+                (3, "C", 2, "C")
+            )
+
+            checkQueryData(df, data)
+        }
+
+        it("INNER JOIN WITH WHERE") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id as id1,
+                  | jt1.val1,
+                  | jt2.id as id2,
+                  | jt2.val2
+                  |FROM
+                  |     jt1 JOIN
+                  |     jt2 ON jt1.val1 = jt2.val2
+                  |WHERE
+                  | jt1.id < 10
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " +
+                "FROM jt1 JOIN jt2 ON jt1.val1 = jt2.val2 " +
+                "WHERE jt1.id IS NOT NULL AND jt1.id < 10 AND jt1.val1 IS NOT NULL and jt2.val2 IS NOT NULL")
+
+            val data = (
+                (2, "B", 1, "B"),
+                (3, "C", 2, "C")
+            )
+
+            checkQueryData(df, data)
+        }
+
+        it("LEFT JOIN") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id as id1,
+                  | jt1.val1,
+                  | jt2.id as id2,
+                  | jt2.val2
+                  |FROM
+                  | jt1 LEFT JOIN
+                  | jt2 ON jt1.val1 = jt2.val2
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " +
+                "FROM jt1 LEFT JOIN jt2 ON jt1.val1 = jt2.val2")
+
+            val data = (
+                (1, "A", null, null),
+                (2, "B", 1, "B"),
+                (3, "C", 2, "C")
+            )
+
+            checkQueryData(df, data)
+        }
+
+        it("RIGHT JOIN") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id as id1,
+                  | jt1.val1,
+                  | jt2.id as id2,
+                  | jt2.val2
+                  |FROM
+                  | jt1 RIGHT JOIN
+                  | jt2 ON jt1.val1 = jt2.val2
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " +
+                "FROM jt1 RIGHT JOIN jt2 ON jt1.val1 = jt2.val2")
+
+            val data = (
+                (2, "B", 1, "B"),
+                (3, "C", 2, "C"),
+                (null, null, 3, "D")
+            )
+
+            checkQueryData(df, data, r ⇒ if (r.get(0) == null) 100L else r.getAs[Long](0))
+        }
+
+        it("JOIN 3 TABLE") {
+            val qry =
+                """
+                  |SELECT
+                  | jt1.id as id1,
+                  | jt1.val1 as val1,
+                  | jt2.id as id2,
+                  | jt2.val2 as val2,
+                  | jt3.id as id3,
+                  | jt3.val3 as val3
+                  |FROM
+                  | jt1 LEFT JOIN
+                  | jt2 ON jt1.val1 = jt2.val2 LEFT JOIN
+                  | jt3 ON jt1.val1 = jt3.val3
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT table1.id as id1, table1.val1, table1.id_2 as id2, table1.val2, jt3.id as id3, jt3.val3 " +
+                    "FROM (" +
+                    "SELECT jt1.val1, jt1.id, jt2.val2, jt2.id as id_2 " +
+                    "FROM JT1 LEFT JOIN jt2 ON jt1.val1 = jt2.val2) table1 LEFT JOIN " +
+                    "jt3 ON table1.val1 = jt3.val3")
+
+            val data = (
+                (1, "A", null, null, 1, "A"),
+                (2, "B", 1, "B", null, null),
+                (3, "C", 2, "C", null, null))
+
+            checkQueryData(df, data)
+        }
+
+        it("JOIN 3 TABLE AND AGGREGATE") {
+            val qry =
+                """
+                  |SELECT SUM(id1) FROM (
+                  | SELECT
+                  |     jt1.id as id1,
+                  |     jt1.val1 as val1,
+                  |     jt2.id as id2,
+                  |     jt2.val2 as val2,
+                  |     jt3.id as id3,
+                  |     jt3.val3 as val3
+                  |FROM
+                  |     jt1 LEFT JOIN
+                  |     jt2 ON jt1.val1 = jt2.val2 LEFT JOIN
+                  |     jt3 ON jt1.val1 = jt3.val3
+                  |) WHERE CONCAT(val1, val2) = 'BB' OR CONCAT(val1, val3) = 'AA'
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+            "SELECT CAST(SUM(table1.ID) AS BIGINT) AS \"sum(id1)\" FROM " +
+                "(SELECT JT1.VAL1, JT1.ID, JT2.VAL2 FROM JT1 LEFT JOIN JT2 ON JT1.val1 = JT2.val2) table1 LEFT JOIN " +
+                "JT3 ON table1.val1 = JT3.val3 " +
+                "WHERE CONCAT(table1.val1, table1.val2) = 'BB' OR CONCAT(table1.val1, JT3.val3) = 'AA'")
+
+            val data = Tuple1(3)
+
+            checkQueryData(df, data, _ ⇒ 0)
+        }
+
+        it("INNER JOIN SUBQUERY") {
+            val qry =
+                """
+                  |SELECT sum_id, val1, val2 FROM (
+                  |  SELECT
+                  |    jt1.id + jt2.id as sum_id,
+                  |    jt1.val1 as val1,
+                  |    jt2.val2 as val2
+                  |  FROM
+                  |     jt1 JOIN
+                  |     jt2 ON jt1.val1 = jt2.val2
+                  |) t1 WHERE sum_id != 15
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT jt1.id + jt2.id as sum_id, jt1.val1, jt2.val2 FROM " +
+                    "jt1 JOIN jt2 ON NOT jt1.id + jt2.id = 15 AND jt1.val1 = jt2.val2 " +
+                    "WHERE " +
+                    "jt1.id IS NOT NULL AND " +
+                    "jt1.val1 IS NOT NULL AND " +
+                    "jt2.id IS NOT NULL AND " +
+                    "jt2.val2 IS NOT NULL"
+            )
+
+            val data = (
+                (3, "B", "B"),
+                (5, "C", "C")
+            )
+
+            checkQueryData(df, data)
+        }
+
+        it("INNER JOIN SUBQUERY - 2") {
+            val qry =
+                """
+                  |SELECT SUM(sum_id) FROM (
+                  |  SELECT
+                  |    jt1.id + jt2.id as sum_id
+                  |  FROM
+                  |     jt1 JOIN
+                  |     jt2 ON jt1.val1 = jt2.val2
+                  |) t1 WHERE sum_id != 15
+                  |""".stripMargin
+
+            val df = igniteSession.sql(qry)
+
+            checkOptimizationResult(df,
+                "SELECT CAST(SUM(JT1.ID + JT2.ID) AS BIGINT) AS \"sum(sum_id)\" " +
+                    "FROM JT1 JOIN JT2 ON NOT JT1.id + JT2.id = 15 AND JT1.val1 = JT2.val2 " +
+                    "WHERE JT1.id IS NOT NULL AND JT1.val1 IS NOT NULL AND JT2.id IS NOT NULL AND JT2.val2 IS NOT NULL")
+
+            val data = Tuple1(8)
+
+            checkQueryData(df, data)
+        }
+    }
+
+    def createJoinedTables(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE jt1 (
+              |    id LONG,
+              |    val1 VARCHAR,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE jt2 (
+              |    id LONG,
+              |    val2 VARCHAR,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        cache.query(new SqlFieldsQuery(
+            """
+              | CREATE TABLE jt3 (
+              |    id LONG,
+              |    val3 VARCHAR,
+              |    PRIMARY KEY (id)) WITH "backups=1"
+            """.stripMargin)).getAll
+
+        var qry = new SqlFieldsQuery("INSERT INTO jt1 (id, val1) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "A")).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "B")).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "C")).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO jt2 (id, val2) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "B")).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "C")).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "D")).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO jt3 (id, val3) values (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "A")).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "D")).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "E")).getAll
+
+        cache.query(new SqlFieldsQuery("CREATE INDEX idx1 ON jt1(val1)")).getAll
+        cache.query(new SqlFieldsQuery("CREATE INDEX idx2 ON jt2(val2)")).getAll
+        cache.query(new SqlFieldsQuery("CREATE INDEX idx3 ON jt3(val3)")).getAll
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createPersonTable(client, DEFAULT_CACHE)
+
+        createCityTable(client, DEFAULT_CACHE)
+
+        createJoinedTables(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}


[3/3] ignite git commit: IGNITE-7077: Implementation of Spark query optimization. - Fixes #3397.

Posted by ni...@apache.org.
IGNITE-7077: Implementation of Spark query optimization. - Fixes #3397.

Signed-off-by: Nikolay Izhikov <ni...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10a4c48b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10a4c48b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10a4c48b

Branch: refs/heads/master
Commit: 10a4c48bf722f49e7eca09c50cf4af6c2ec59b1b
Parents: 8008601
Author: Nikolay Izhikov <ni...@apache.org>
Authored: Fri Apr 20 11:23:42 2018 +0300
Committer: Nikolay Izhikov <ni...@apache.org>
Committed: Fri Apr 20 11:23:42 2018 +0300

----------------------------------------------------------------------
 .../ignite/spark/IgniteDataFrameSettings.scala  |   9 +
 .../org/apache/ignite/spark/IgniteRDD.scala     |   7 +-
 .../spark/impl/IgniteRelationProvider.scala     |  20 +-
 .../impl/IgniteSQLAccumulatorRelation.scala     |  98 ++++
 .../spark/impl/IgniteSQLDataFrameRDD.scala      |  16 +-
 .../ignite/spark/impl/IgniteSQLRelation.scala   |  67 +--
 .../optimization/AggregateExpressions.scala     | 114 ++++
 .../optimization/ConditionExpressions.scala     | 160 ++++++
 .../impl/optimization/DateExpressions.scala     | 127 +++++
 .../impl/optimization/IgniteQueryContext.scala  |  52 ++
 .../impl/optimization/MathExpressions.scala     | 263 +++++++++
 .../impl/optimization/SimpleExpressions.scala   | 180 ++++++
 .../impl/optimization/StringExpressions.scala   | 154 ++++++
 .../optimization/SupportedExpressions.scala     |  42 ++
 .../impl/optimization/SystemExpressions.scala   | 122 +++++
 .../accumulator/JoinSQLAccumulator.scala        | 222 ++++++++
 .../accumulator/QueryAccumulator.scala          |  70 +++
 .../accumulator/SelectAccumulator.scala         |  70 +++
 .../accumulator/SingleTableSQLAccumulator.scala | 124 +++++
 .../accumulator/UnionSQLAccumulator.scala       |  63 +++
 .../spark/impl/optimization/package.scala       | 230 ++++++++
 .../org/apache/ignite/spark/impl/package.scala  |  48 +-
 .../spark/sql/ignite/IgniteOptimization.scala   | 436 +++++++++++++++
 .../spark/sql/ignite/IgniteSparkSession.scala   |  10 +-
 .../ignite/spark/AbstractDataFrameSpec.scala    |  68 ++-
 .../apache/ignite/spark/IgniteCatalogSpec.scala |   7 +-
 .../spark/IgniteDataFrameSchemaSpec.scala       |   5 +-
 .../ignite/spark/IgniteDataFrameSuite.scala     |   9 +-
 .../IgniteOptimizationAggregationFuncSpec.scala | 189 +++++++
 .../spark/IgniteOptimizationDateFuncSpec.scala  | 230 ++++++++
 .../IgniteOptimizationDisableEnableSpec.scala   | 127 +++++
 .../spark/IgniteOptimizationJoinSpec.scala      | 543 +++++++++++++++++++
 .../spark/IgniteOptimizationMathFuncSpec.scala  | 358 ++++++++++++
 .../ignite/spark/IgniteOptimizationSpec.scala   | 305 +++++++++++
 .../IgniteOptimizationStringFuncSpec.scala      | 313 +++++++++++
 .../IgniteOptimizationSystemFuncSpec.scala      | 147 +++++
 36 files changed, 4924 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
index 6bff476..9daaec4 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -153,4 +153,13 @@ object IgniteDataFrameSettings {
       * @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
       */
     val OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS = "streamerPerNodeParallelOperations"
+
+    /**
+      * Option for a [[org.apache.spark.sql.SparkSession]] configuration.
+      * If `true` then all Ignite optimization of Spark SQL statements will be disabled.
+      * Default value is `false`.
+      *
+      * @see [[org.apache.spark.sql.ignite.IgniteOptimization]]
+      */
+    val OPTION_DISABLE_SPARK_SQL_OPTIMIZATION = "ignite.disableSparkSQLOptimization"
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index d87ea0a..5fb81b6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -344,6 +344,11 @@ class IgniteRDD[K, V] (
 
 object IgniteRDD {
     /**
+      * Default decimal type.
+      */
+    private[spark] val DECIMAL = DecimalType(DecimalType.MAX_PRECISION, 3)
+
+    /**
       * Gets Spark data type based on type name.
       *
       * @param typeName Type name.
@@ -357,7 +362,7 @@ object IgniteRDD {
         case "java.lang.Long" ⇒ LongType
         case "java.lang.Float" ⇒ FloatType
         case "java.lang.Double" ⇒ DoubleType
-        case "java.math.BigDecimal" ⇒ DataTypes.createDecimalType()
+        case "java.math.BigDecimal" ⇒ DECIMAL
         case "java.lang.String" ⇒ StringType
         case "java.util.Date" ⇒ DateType
         case "java.sql.Date" ⇒ DateType

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index a9f9f89..e4fa9f7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -26,6 +26,7 @@ import org.apache.ignite.spark.IgniteDataFrameSettings._
 import org.apache.ignite.spark.impl.QueryHelper.{createTable, dropTable, ensureCreateTableOptions, saveTable}
 import org.apache.spark.sql.SaveMode.{Append, Overwrite}
 import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, OPTION_GRID}
+import org.apache.spark.sql.ignite.IgniteOptimization
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
@@ -176,11 +177,28 @@ class IgniteRelationProvider extends RelationProvider
       * @param sqlCtx SQL context.
       * @return Ignite SQL relation.
       */
-    private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation =
+    private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = {
+        val optimizationDisabled =
+            sqlCtx.sparkSession.conf.get(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false").toBoolean
+
+        val experimentalMethods = sqlCtx.sparkSession.sessionState.experimentalMethods
+
+        if (optimizationDisabled) {
+            experimentalMethods.extraOptimizations = 
+                experimentalMethods.extraOptimizations.filter(_ != IgniteOptimization)
+        } 
+        else {
+            val optimizationExists = experimentalMethods.extraOptimizations.contains(IgniteOptimization)
+
+            if (!optimizationExists)
+                experimentalMethods.extraOptimizations = experimentalMethods.extraOptimizations :+ IgniteOptimization
+        }
+
         IgniteSQLRelation(
             igniteCtx,
             tblName,
             sqlCtx)
+    }
 
     /**
       * @param params Params.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala
new file mode 100644
index 0000000..6eb600a
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.spark.impl
+import org.apache.ignite.spark.impl.optimization.accumulator.{JoinSQLAccumulator, QueryAccumulator}
+import org.apache.ignite.spark.impl.optimization.isSimpleTableAcc
+import org.apache.spark.Partition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.{Metadata, StructField, StructType}
+
+/**
+  * Relation to query data from query generated by <code>QueryAccumulator</code>.
+  * <code>QueryAccumulator</code> is generated by <code>IgniteOptimization</code>.
+  *
+  * @see IgniteOptimization
+  */
+class IgniteSQLAccumulatorRelation[K, V](val acc: QueryAccumulator)
+    (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
+
+    /** @inheritdoc */
+    override def schema: StructType =
+        StructType(acc.output.map { c ⇒
+            StructField(
+                name = c.name,
+                dataType = c.dataType,
+                nullable = c.nullable,
+                metadata = Metadata.empty)
+        })
+
+    /** @inheritdoc */
+    override def buildScan(): RDD[Row] =
+        IgniteSQLDataFrameRDD[K, V](
+            acc.igniteQueryContext.igniteContext,
+            acc.igniteQueryContext.cacheName,
+            schema,
+            acc.compileQuery(),
+            List.empty,
+            calcPartitions,
+            isDistributeJoin(acc)
+        )
+
+    /** @inheritdoc */
+    override def toString: String =
+        s"IgniteSQLAccumulatorRelation(columns=[${acc.output.map(_.name).mkString(", ")}], qry=${acc.compileQuery()})"
+
+    /**
+      * @return Collection of spark partition.
+      */
+    private def calcPartitions: Array[Partition] =
+        //If accumulator stores some complex query(join, aggregation, limit, order, etc.).
+        //we has to load data from Ignite as a single Spark partition.
+        if (!isSimpleTableAcc(acc)){
+            val aff = acc.igniteQueryContext.igniteContext.ignite().affinity(acc.igniteQueryContext.cacheName)
+
+            val parts = aff.partitions()
+
+            Array(IgniteDataFramePartition(0, primary = null, igniteParts = (0 until parts).toList))
+        }
+        else
+            impl.calcPartitions(acc.igniteQueryContext.igniteContext, acc.igniteQueryContext.cacheName)
+
+    /**
+      * @param acc Plan.
+      * @return True if plan of one or its children are `JoinSQLAccumulator`, false otherwise.
+      */
+    private def isDistributeJoin(acc: LogicalPlan): Boolean =
+        acc match {
+            case _: JoinSQLAccumulator ⇒
+                true
+
+            case _ ⇒
+                acc.children.exists(isDistributeJoin)
+        }
+}
+
+object IgniteSQLAccumulatorRelation {
+    def apply[K, V](acc: QueryAccumulator): IgniteSQLAccumulatorRelation[K, V] =
+        new IgniteSQLAccumulatorRelation[K, V](acc)(acc.igniteQueryContext.sqlContext)
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
index 93ef529..ec502fc 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
@@ -36,7 +36,8 @@ class IgniteSQLDataFrameRDD[K, V](
     schema: StructType,
     qryStr: String,
     args: List[_],
-    parts: Array[Partition]) extends
+    parts: Array[Partition],
+    distributedJoin: Boolean) extends
     IgniteSqlRDD[Row, JList[_], K, V](
         ic,
         cacheName,
@@ -56,13 +57,17 @@ class IgniteSQLDataFrameRDD[K, V](
     override def compute(partition: Partition, context: TaskContext): Iterator[Row] = {
         val qry0 = new SqlFieldsQuery(qryStr)
 
+        qry0.setDistributedJoins(distributedJoin)
+
         if (args.nonEmpty)
             qry0.setArgs(args.map(_.asInstanceOf[Object]): _*)
 
         val ccfg = ic.ignite().cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
 
-        if (ccfg.getCacheMode != CacheMode.REPLICATED)
-            qry0.setPartitions(partition.asInstanceOf[IgniteDataFramePartition].igniteParts: _*)
+        val ignitePartition = partition.asInstanceOf[IgniteDataFramePartition]
+
+        if (ccfg.getCacheMode != CacheMode.REPLICATED && ignitePartition.igniteParts.nonEmpty && !distributedJoin)
+            qry0.setPartitions(ignitePartition.igniteParts: _*)
 
         qry = qry0
 
@@ -76,7 +81,8 @@ object IgniteSQLDataFrameRDD {
         schema: StructType,
         qryStr: String,
         args: List[_],
-        parts: Array[Partition] = Array(IgnitePartition(0))) = {
-        new IgniteSQLDataFrameRDD(ic, cacheName, schema, qryStr, args, parts)
+        parts: Array[Partition] = Array(IgnitePartition(0)),
+        distributedJoin: Boolean = false): IgniteSQLDataFrameRDD[K, V] = {
+        new IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryStr, args, parts, distributedJoin)
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index 1fb8de7..485ddf6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -18,10 +18,8 @@
 package org.apache.ignite.spark.impl
 
 import org.apache.ignite.IgniteException
-import org.apache.ignite.cache.{CacheMode, QueryEntity}
-import org.apache.ignite.cluster.ClusterNode
-import org.apache.ignite.configuration.CacheConfiguration
-import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD, impl}
 import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -30,14 +28,13 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Row, SQLContext}
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
 
 /**
   * Apache Ignite implementation of Spark BaseRelation with PrunedFilteredScan for Ignite SQL Tables
   */
 class IgniteSQLRelation[K, V](
-    private[spark] val ic: IgniteContext,
-    private[spark] val tableName: String)
+    private[apache] val ic: IgniteContext,
+    private[apache] val tableName: String)
     (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with Logging {
 
     /**
@@ -92,41 +89,13 @@ class IgniteSQLRelation[K, V](
         qryAndArgs
     }
 
-    private def calcPartitions(filters: Array[Filter]): Array[Partition] = {
-        val cache = ic.ignite().cache[K, V](cacheName)
-
-        val ccfg = cache.getConfiguration(classOf[CacheConfiguration[K, V]])
-
-        if (ccfg.getCacheMode == CacheMode.REPLICATED) {
-            val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes()
-
-            Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList))
-        }
-        else {
-            val aff = ic.ignite().affinity(cacheName)
-
-            val parts = aff.partitions()
-
-            val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) {
-                case (nodeToParts, ignitePartIdx) ⇒
-                    val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head
-
-                    if (nodeToParts.contains(primary)) {
-                        nodeToParts(primary) += ignitePartIdx
-
-                        nodeToParts
-                    }
-                    else
-                        nodeToParts + (primary → ArrayBuffer[Int](ignitePartIdx))
-            }
-
-            val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) ⇒
-                IgniteDataFramePartition(i, node, nodesParts.toList)
-            }
-
-            partitions.toArray
-        }
-    }
+    /**
+      * Computes spark partitions for this relation.
+      *
+      * @return Array of IgniteDataFramPartition.
+      */
+    private def calcPartitions(filters: Array[Filter]): Array[Partition] =
+        impl.calcPartitions(ic, cacheName)
 
     /**
       * Cache name for a table name.
@@ -134,20 +103,6 @@ class IgniteSQLRelation[K, V](
     private lazy val cacheName: String =
         sqlCacheName(ic.ignite(), tableName)
             .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
-
-    /**
-      * Utility method to add clause to sql WHERE string.
-      *
-      * @param filterStr Current filter string
-      * @param clause Clause to add.
-      * @return Filter string.
-      */
-    private def addStrClause(filterStr: String, clause: String) =
-        if (filterStr.isEmpty)
-            clause
-        else
-            filterStr + " AND " + clause
-
 }
 
 object IgniteSQLRelation {

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
new file mode 100644
index 0000000..421a9a9
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.types._
+
+/**
+  * Object to support aggregate expressions like `sum` or `avg`.
+  */
+private[optimization] object AggregateExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+        case AggregateExpression(aggregateFunction, _, _, _) ⇒
+            checkChild(aggregateFunction)
+
+        case Average(child) ⇒
+            checkChild(child)
+
+        case Count(children) ⇒
+            children.forall(checkChild)
+
+        case Max(child) ⇒
+            checkChild(child)
+
+        case Min(child) ⇒
+            checkChild(child)
+
+        case Sum(child) ⇒
+            checkChild(child)
+
+        case _ ⇒
+            false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case AggregateExpression(aggregateFunction, _, isDistinct, _) ⇒
+            aggregateFunction match {
+                case Count(children) ⇒
+                    if (isDistinct)
+                        Some(s"COUNT(DISTINCT ${children.map(childToString(_)).mkString(" ")})")
+                    else
+                        Some(s"COUNT(${children.map(childToString(_)).mkString(" ")})")
+
+                case sum: Sum ⇒
+                    if (isDistinct)
+                        Some(castSum(
+                            s"SUM(DISTINCT ${sum.children.map(childToString(_)).mkString(" ")})", sum.dataType))
+                    else
+                        Some(castSum(s"SUM(${sum.children.map(childToString(_)).mkString(" ")})", sum.dataType))
+
+                case _ ⇒
+                    Some(childToString(aggregateFunction))
+            }
+
+        case Average(child) ⇒
+            child.dataType match {
+                case DecimalType() | DoubleType ⇒
+                    Some(s"AVG(${childToString(child)})")
+
+                case _ ⇒
+                    //Spark `AVG` return type is always a double or a decimal.
+                    //See [[org.apache.spark.sql.catalyst.expressions.aggregate.Average]]
+                    //But Ignite `AVG` return type for a integral types is integral.
+                    //To preserve query correct results has to cast column to double.
+                    Some(s"AVG(CAST(${childToString(child)} AS DOUBLE))")
+            }
+
+
+        case Count(children) ⇒
+            Some(s"COUNT(${children.map(childToString(_)).mkString(" ")})")
+
+        case Max(child) ⇒
+            Some(s"MAX(${childToString(child)})")
+
+        case Min(child) ⇒
+            Some(s"MIN(${childToString(child)})")
+
+        case sum: Sum ⇒
+            Some(castSum(s"SUM(${childToString(sum.child)})", sum.dataType))
+
+        case _ ⇒
+            None
+    }
+
+    /**
+      * Ignite returns BigDecimal but Spark expects BIGINT.
+      */
+    private def castSum(sumSql: String, dataType: DataType): String = dataType match {
+        case LongType ⇒
+            s"CAST($sumSql AS BIGINT)"
+
+        case _ ⇒
+            s"$sumSql"
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
new file mode 100644
index 0000000..fbfbd64
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+  * Object to support condition expression. Like `and` or `in` operators.
+  */
+private[optimization] object ConditionExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+        case EqualTo(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case EqualNullSafe(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case GreaterThan(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case GreaterThanOrEqual(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case LessThan(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case LessThanOrEqual(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case InSet(child, set) if set.forall(_.isInstanceOf[Literal]) ⇒
+            checkChild(child)
+
+        case In(child, list) if list.forall(_.isInstanceOf[Literal]) ⇒
+            checkChild(child)
+
+        case IsNull(child) ⇒
+            checkChild(child)
+
+        case IsNotNull(child) ⇒
+            checkChild(child)
+
+        case And(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Or(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Not(child) ⇒
+            checkChild(child)
+
+        case StartsWith(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case EndsWith(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Contains(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case _ ⇒
+            false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case EqualTo(left, right) ⇒
+            Some(s"${childToString(left)} = ${childToString(right)}")
+
+        case EqualNullSafe(left, right) ⇒
+            Some(s"(${childToString(left)} IS NULL OR ${childToString(left)} = ${childToString(right)})")
+
+        case GreaterThan(left, right) ⇒
+            Some(s"${childToString(left)} > ${childToString(right)}")
+
+        case GreaterThanOrEqual(left, right) ⇒
+            Some(s"${childToString(left)} >= ${childToString(right)}")
+
+        case LessThan(left, right) ⇒
+            Some(s"${childToString(left)} < ${childToString(right)}")
+
+        case LessThanOrEqual(left, right) ⇒
+            Some(s"${childToString(left)} <= ${childToString(right)}")
+
+        case In(attr, values) ⇒
+            Some(s"${childToString(attr)} IN (${values.map(childToString(_)).mkString(", ")})")
+
+        case IsNull(child) ⇒
+            Some(s"${childToString(child)} IS NULL")
+
+        case IsNotNull(child) ⇒
+            Some(s"${childToString(child)} IS NOT NULL")
+
+        case And(left, right) ⇒
+            Some(s"${childToString(left)} AND ${childToString(right)}")
+
+        case Or(left, right) ⇒
+            Some(s"${childToString(left)} OR ${childToString(right)}")
+
+        case Not(child) ⇒
+            Some(s"NOT ${childToString(child)}")
+
+        case StartsWith(attr, value) ⇒ {
+            //Expecting string literal here.
+            //To add % sign it's required to remove quotes.
+            val valStr = removeQuotes(childToString(value))
+
+            Some(s"${childToString(attr)} LIKE '$valStr%'")
+        }
+
+        case EndsWith(attr, value) ⇒ {
+            //Expecting string literal here.
+            //To add % sign it's required to remove quotes.
+            val valStr = removeQuotes(childToString(value))
+
+            Some(s"${childToString(attr)} LIKE '%$valStr'")
+        }
+
+        case Contains(attr, value) ⇒ {
+            //Expecting string literal here.
+            //To add % signs it's required to remove quotes.
+            val valStr = removeQuotes(childToString(value))
+
+            Some(s"${childToString(attr)} LIKE '%$valStr%'")
+        }
+
+        case _ ⇒
+            None
+    }
+
+    /**
+      * @param str String to process.
+      * @return Str without surrounding quotes.
+      */
+    private def removeQuotes(str: String): String =
+        if (str.length < 2)
+            str
+        else
+            str match {
+                case quoted if quoted.startsWith("'") && quoted.endsWith("'") ⇒
+                    quoted.substring(1, quoted.length-1)
+
+                case _ ⇒ str
+            }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
new file mode 100644
index 0000000..d075bf0
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+  * Object to support expressions to work with date/timestamp.
+  */
+private[optimization] object DateExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+        case CurrentDate(None) ⇒
+            true
+
+        case CurrentTimestamp() ⇒
+            true
+
+        case DateAdd(startDate, days) ⇒
+            checkChild(startDate) && checkChild(days)
+
+        case DateDiff(date1, date2) ⇒
+            checkChild(date1) && checkChild(date2)
+
+        case DayOfMonth(date) ⇒
+            checkChild(date)
+
+        case DayOfYear(date) ⇒
+            checkChild(date)
+
+        case Hour(date, _) ⇒
+            checkChild(date)
+
+        case Minute(date, _) ⇒
+            checkChild(date)
+
+        case Month(date) ⇒
+            checkChild(date)
+
+        case ParseToDate(left, format, child) ⇒
+            checkChild(left) && (format.isEmpty || checkChild(format.get)) && checkChild(child)
+
+        case Quarter(date) ⇒
+            checkChild(date)
+
+        case Second(date, _) ⇒
+            checkChild(date)
+
+        case WeekOfYear(date) ⇒
+            checkChild(date)
+
+        case Year(date) ⇒
+            checkChild(date)
+
+        case _ ⇒
+            false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case CurrentDate(_) ⇒
+            Some(s"CURRENT_DATE()")
+
+        case CurrentTimestamp() ⇒
+            Some(s"CURRENT_TIMESTAMP()")
+
+        case DateAdd(startDate, days) ⇒
+            Some(s"CAST(DATEADD('DAY', ${childToString(days)}, ${childToString(startDate)}) AS DATE)")
+
+        case DateDiff(date1, date2) ⇒
+            Some(s"CAST(DATEDIFF('DAY', ${childToString(date1)}, ${childToString(date2)}) AS INT)")
+
+        case DayOfMonth(date) ⇒
+            Some(s"DAY_OF_MONTH(${childToString(date)})")
+
+        case DayOfYear(date) ⇒
+            Some(s"DAY_OF_YEAR(${childToString(date)})")
+
+        case Hour(date, _) ⇒
+            Some(s"HOUR(${childToString(date)})")
+
+        case Minute(date, _) ⇒
+            Some(s"MINUTE(${childToString(date)})")
+
+        case Month(date) ⇒
+            Some(s"MINUTE(${childToString(date)})")
+
+        case ParseToDate(left, formatOption, _) ⇒
+            formatOption match {
+                case Some(format) ⇒
+                    Some(s"PARSEDATETIME(${childToString(left)}, ${childToString(format)})")
+                case None ⇒
+                    Some(s"PARSEDATETIME(${childToString(left)})")
+            }
+
+        case Quarter(date) ⇒
+            Some(s"QUARTER(${childToString(date)})")
+
+        case Second(date, _) ⇒
+            Some(s"SECOND(${childToString(date)})")
+
+        case WeekOfYear(date) ⇒
+            Some(s"WEEK(${childToString(date)})")
+
+        case Year(date) ⇒
+            Some(s"YEAR(${childToString(date)})")
+
+        case _ ⇒
+            None
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala
new file mode 100644
index 0000000..c5a7f34
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+
+/**
+  * Class to store Ignite query info during optimization process.
+  *
+  * @param igniteContext IgniteContext.
+  * @param sqlContext SQLContext.
+  * @param cacheName Cache name.
+  * @param aliasIndex Iterator to generate indexes for auto-generated aliases.
+  * @param catalogTable CatalogTable from source relation.
+  */
+case class IgniteQueryContext(
+    igniteContext: IgniteContext,
+    sqlContext: SQLContext,
+    cacheName: String,
+    aliasIndex: Iterator[Int],
+    catalogTable: Option[CatalogTable] = None,
+    distributeJoin: Boolean = false
+) {
+    /**
+      * @return Unique table alias.
+      */
+    def uniqueTableAlias: String = "table" + aliasIndex.next
+
+    /**
+      * @param col Column
+      * @return Unique column alias.
+      */
+    def uniqueColumnAlias(col: NamedExpression): String = col.name + "_" + aliasIndex.next
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
new file mode 100644
index 0000000..dc05e95
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+  * Object to support math expressions.
+  */
+private[optimization] object MathExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+        case Abs(child) ⇒
+            checkChild(child)
+
+        case Acos(child) ⇒
+            checkChild(child)
+
+        case Asin(child) ⇒
+            checkChild(child)
+
+        case Atan(child) ⇒
+            checkChild(child)
+
+        case Cos(child) ⇒
+            checkChild(child)
+
+        case Cosh(child) ⇒
+            checkChild(child)
+
+        case Sin(child) ⇒
+            checkChild(child)
+
+        case Sinh(child) ⇒
+            checkChild(child)
+
+        case Tan(child) ⇒
+            checkChild(child)
+
+        case Tanh(child) ⇒
+            checkChild(child)
+
+        case Atan2(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case BitwiseAnd(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case BitwiseOr(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case BitwiseXor(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Ceil(child) ⇒
+            checkChild(child)
+
+        case ToDegrees(child) ⇒
+            checkChild(child)
+
+        case Exp(child) ⇒
+            checkChild(child)
+
+        case Floor(child) ⇒
+            checkChild(child)
+
+        case Log(child) ⇒
+            checkChild(child)
+
+        case Log10(child) ⇒
+            checkChild(child)
+
+        case Logarithm(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case ToRadians(child) ⇒
+            checkChild(child)
+
+        case Sqrt(child) ⇒
+            checkChild(child)
+
+        case _: Pi ⇒
+            true
+
+        case _: EulerNumber ⇒
+            true
+
+        case Pow(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Rand(child) ⇒
+            checkChild(child)
+
+        case Round(child, scale) ⇒
+            checkChild(child) && checkChild(scale)
+
+        case Signum(child) ⇒
+            checkChild(child)
+
+        case Remainder(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Divide(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Multiply(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Subtract(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Add(left, right) ⇒
+            checkChild(left) && checkChild(right)
+
+        case UnaryMinus(child) ⇒
+            checkChild(child)
+
+        case UnaryPositive(child) ⇒
+            checkChild(child)
+
+        case _ ⇒ false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case Abs(child) ⇒
+            Some(s"ABS(${childToString(child)})")
+
+        case Acos(child) ⇒
+            Some(s"ACOS(${childToString(child)})")
+
+        case Asin(child) ⇒
+            Some(s"ASIN(${childToString(child)})")
+
+        case Atan(child) ⇒
+            Some(s"ATAN(${childToString(child)})")
+
+        case Cos(child) ⇒
+            Some(s"COS(${childToString(child)})")
+
+        case Cosh(child) ⇒
+            Some(s"COSH(${childToString(child)})")
+
+        case Sin(child) ⇒
+            Some(s"SIN(${childToString(child)})")
+
+        case Sinh(child) ⇒
+            Some(s"SINH(${childToString(child)})")
+
+        case Tan(child) ⇒
+            Some(s"TAN(${childToString(child)})")
+
+        case Tanh(child) ⇒
+            Some(s"TANH(${childToString(child)})")
+
+        case Atan2(left, right) ⇒
+            Some(s"ATAN2(${childToString(left)}, ${childToString(right)})")
+
+        case BitwiseAnd(left, right) ⇒
+            Some(s"BITAND(${childToString(left)}, ${childToString(right)})")
+
+        case BitwiseOr(left, right) ⇒
+            Some(s"BITOR(${childToString(left)}, ${childToString(right)})")
+
+        case BitwiseXor(left, right) ⇒
+            Some(s"BITXOR(${childToString(left)}, ${childToString(right)})")
+
+        case Ceil(child) ⇒
+            Some(s"CAST(CEIL(${childToString(child)}) AS LONG)")
+
+        case ToDegrees(child) ⇒
+            Some(s"DEGREES(${childToString(child)})")
+
+        case Exp(child) ⇒
+            Some(s"EXP(${childToString(child)})")
+
+        case Floor(child) ⇒
+            Some(s"CAST(FLOOR(${childToString(child)}) AS LONG)")
+
+        case Log(child) ⇒
+            Some(s"LOG(${childToString(child)})")
+
+        case Log10(child) ⇒
+            Some(s"LOG10(${childToString(child)})")
+
+        case Logarithm(base, arg) ⇒
+            childToString(base) match {
+                //Spark internally converts LN(XXX) to LOG(2.718281828459045, XXX).
+                //Because H2 doesn't have builtin function for a free base logarithm
+                //I want to prevent usage of log(a, b) = ln(a)/ln(b) when possible.
+                case "2.718281828459045" ⇒
+                    Some(s"LOG(${childToString(arg)})")
+                case "10" ⇒
+                    Some(s"LOG10(${childToString(arg)})")
+                case argStr ⇒
+                    Some(s"(LOG(${childToString(arg)})/LOG($argStr))")
+            }
+
+        case ToRadians(child) ⇒
+            Some(s"RADIANS(${childToString(child)})")
+
+        case Sqrt(child) ⇒
+            Some(s"SQRT(${childToString(child)})")
+
+        case _: Pi ⇒
+            Some("PI()")
+
+        case _: EulerNumber ⇒
+            Some("E()")
+
+        case Pow(left, right) ⇒
+            Some(s"POWER(${childToString(left)}, ${childToString(right)})")
+
+        case Rand(child) ⇒
+            Some(s"RAND(${childToString(child)})")
+
+        case Round(child, scale) ⇒
+            Some(s"ROUND(${childToString(child)}, ${childToString(scale)})")
+
+        case Signum(child) ⇒
+            Some(s"SIGN(${childToString(child)})")
+
+        case Remainder(left, right) ⇒
+            Some(s"${childToString(left)} % ${childToString(right)}")
+
+        case Divide(left, right) ⇒
+            Some(s"${childToString(left)} / ${childToString(right)}")
+
+        case Multiply(left, right) ⇒
+            Some(s"${childToString(left)} * ${childToString(right)}")
+
+        case Subtract(left, right) ⇒
+            Some(s"${childToString(left)} - ${childToString(right)}")
+
+        case Add(left, right) ⇒
+            Some(s"${childToString(left)} + ${childToString(right)}")
+
+        case UnaryMinus(child) ⇒
+            Some(s"-${childToString(child)}")
+
+        case UnaryPositive(child) ⇒
+            Some(s"+${childToString(child)}")
+
+        case _ ⇒
+            None
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
new file mode 100644
index 0000000..a1c9458
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
@@ -0,0 +1,180 @@
+package org.apache.ignite.spark.impl.optimization
+
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+/**
+  * Object to support some 'simple' expressions like aliases.
+  */
+private[optimization] object SimpleExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    override def apply(expr: Expression, checkChild: Expression ⇒ Boolean): Boolean = expr match {
+        case Literal(_, _) ⇒
+            true
+
+        case _: Attribute ⇒
+            true
+
+        case Alias(child, _) ⇒
+            checkChild(child)
+
+        case Cast(child, dataType, _) ⇒
+            checkChild(child) && castSupported(from = child.dataType, to = dataType)
+
+        case _ ⇒
+            false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case l: Literal ⇒ l.dataType match {
+            case StringType ⇒
+                Some("'" + l.value.toString + "'")
+
+            case TimestampType ⇒
+                l.value match {
+                    //Internal representation of TimestampType is Long.
+                    //So we converting from internal spark representation to CAST call.
+                    case date: Long ⇒
+                        Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}' AS TIMESTAMP)")
+
+                    case _ ⇒
+                        Some(l.value.toString)
+                }
+
+            case DateType ⇒
+                l.value match {
+                    //Internal representation of DateType is Int.
+                    //So we converting from internal spark representation to CAST call.
+                    case days: Integer ⇒
+                        val date = new java.util.Date(DateTimeUtils.daysToMillis(days))
+
+                        Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)")
+
+                    case _ ⇒
+                        Some(l.value.toString)
+                }
+
+            case _ ⇒
+                if (l.value == null)
+                    Some("null")
+                else
+                    Some(l.value.toString)
+        }
+
+        case ar: AttributeReference ⇒
+            val name =
+                if (useQualifier)
+                    ar.qualifier.map(_ + "." + ar.name).getOrElse(ar.name)
+                else
+                    ar.name
+
+            if (ar.metadata.contains(ALIAS) && !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && useAlias)
+                Some(aliasToString(name, ar.metadata.getString(ALIAS)))
+            else
+                Some(name)
+
+        case Alias(child, name) ⇒
+            if (useAlias)
+                Some(childToString(child)).map(aliasToString(_, name))
+            else
+                Some(childToString(child))
+
+        case Cast(child, dataType, _) ⇒
+            Some(s"CAST(${childToString(child)} AS ${toSqlType(dataType)})")
+
+        case SortOrder(child, direction, _, _) ⇒
+            Some(s"${childToString(child)}${if(direction==Descending) " DESC" else ""}")
+
+        case _ ⇒
+            None
+    }
+
+    /**
+      * @param column Column name.
+      * @param alias Alias.
+      * @return SQL String for column with alias.
+      */
+    private def aliasToString(column: String, alias: String): String =
+        if (isAliasEqualColumnName(alias, column))
+            column
+        else if (alias.matches("[A-Za-z_][0-9A-Za-z_]*"))
+            s"$column AS $alias"
+        else
+            s"""$column AS "$alias""""
+
+    /**
+      * @param alias Alias.
+      * @param column Column.
+      * @return True if name equals to alias, false otherwise.
+      */
+    private def isAliasEqualColumnName(alias: String, column: String): Boolean =
+        alias.compareToIgnoreCase(column.replaceAll("'", "")) == 0
+
+    /**
+      * @param from From type conversion.
+      * @param to To type conversion.
+      * @return True if cast support for types, false otherwise.
+      */
+    private def castSupported(from: DataType, to: DataType): Boolean = from match {
+        case BooleanType ⇒
+            Set[DataType](BooleanType, StringType)(to)
+
+        case ByteType ⇒
+            Set(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _), StringType)(to)
+
+        case ShortType ⇒
+            Set(ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+        case IntegerType ⇒
+            Set(IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+        case LongType ⇒
+            Set(LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+        case FloatType ⇒
+            Set(FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+        case DoubleType ⇒
+            Set(DoubleType, StringType, DecimalType(_, _))(to)
+
+        case DecimalType() ⇒
+            Set(StringType, DecimalType(_, _))(to)
+
+        case DateType ⇒
+            Set[DataType](DateType, StringType, LongType, TimestampType)(to)
+
+        case TimestampType ⇒
+            Set[DataType](TimestampType, DateType, StringType, LongType)(to)
+
+        case StringType ⇒
+            Set(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType,
+                DecimalType(_, _), DateType, TimestampType, StringType)(to)
+
+        case BinaryType ⇒
+            false
+
+        case ArrayType(_, _) ⇒
+            false
+    }
+
+    /**
+      * Date format built-in Ignite.
+      */
+    private val dateFormat: ThreadLocal[SimpleDateFormat] = new ThreadLocal[SimpleDateFormat] {
+        override def initialValue(): SimpleDateFormat =
+            new SimpleDateFormat("yyyy-MM-dd")
+    }
+
+    /**
+      * Timestamp format built-in Ignite.
+      */
+    private val timestampFormat: ThreadLocal[SimpleDateFormat] = new ThreadLocal[SimpleDateFormat] {
+        override def initialValue(): SimpleDateFormat =
+            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
new file mode 100644
index 0000000..1ecab2c
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+  * Object to support expressions to work with strings like `length` or `trim`.
+  */
+private[optimization] object StringExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+        case Ascii(child) ⇒
+            checkChild(child)
+
+        case Length(child) ⇒
+            checkChild(child)
+
+        case Concat(children) ⇒
+            children.forall(checkChild)
+
+        case ConcatWs(children) ⇒
+            children.forall(checkChild)
+
+        case StringInstr(str, substr) ⇒
+            checkChild(str) && checkChild(substr)
+
+        case Lower(child) ⇒
+            checkChild(child)
+
+        case Upper(child) ⇒
+            checkChild(child)
+
+        case StringLocate(substr, str, start) ⇒
+            checkChild(substr) && checkChild(str) && checkChild(start)
+
+        case StringLPad(str, len, pad) ⇒
+            checkChild(str) && checkChild(len) && checkChild(pad)
+
+        case StringRPad(str, len, pad) ⇒
+            checkChild(str) && checkChild(len) && checkChild(pad)
+
+        case StringTrimLeft(child) ⇒
+            checkChild(child)
+
+        case StringTrimRight(child) ⇒
+            checkChild(child)
+
+        case StringTrim(child) ⇒
+            checkChild(child)
+
+        case RegExpReplace(subject, regexp, rep) ⇒
+            checkChild(subject) && checkChild(regexp) && checkChild(rep)
+
+        case StringRepeat(str, times) ⇒
+            checkChild(str) && checkChild(times)
+
+        case SoundEx(child) ⇒
+            checkChild(child)
+
+        case StringSpace(child) ⇒
+            checkChild(child)
+
+        case Substring(str, pos, len) ⇒
+            checkChild(str) && checkChild(pos) && checkChild(len)
+
+        case Substring(str, pos, len) ⇒
+            checkChild(str) && checkChild(pos) && checkChild(len)
+
+        case StringTranslate(str, strMatch, strReplace) ⇒
+            checkChild(str) && checkChild(strMatch) && checkChild(strReplace)
+
+        case _ ⇒ false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case Ascii(child) ⇒
+            Some(s"ASCII(${childToString(child)})")
+
+        case Length(child) ⇒
+            Some(s"CAST(LENGTH(${childToString(child)}) AS INTEGER)")
+
+        case Concat(children) ⇒
+            Some(s"CONCAT(${children.map(childToString(_)).mkString(", ")})")
+
+        case ConcatWs(children) ⇒
+            Some(s"CONCAT_WS(${children.map(childToString(_)).mkString(", ")})")
+
+        case StringInstr(str, substr) ⇒
+            Some(s"POSITION(${childToString(substr)}, ${childToString(str)})")
+
+        case Lower(child) ⇒
+            Some(s"LOWER(${childToString(child)})")
+
+        case Upper(child) ⇒
+            Some(s"UPPER(${childToString(child)})")
+
+        case StringLocate(substr, str, start) ⇒
+            Some(s"LOCATE(${childToString(substr)}, ${childToString(str)}, ${childToString(start)})")
+
+        case StringLPad(str, len, pad) ⇒
+            Some(s"LPAD(${childToString(str)}, ${childToString(len)}, ${childToString(pad)})")
+
+        case StringRPad(str, len, pad) ⇒
+            Some(s"RPAD(${childToString(str)}, ${childToString(len)}, ${childToString(pad)})")
+
+        case StringTrimLeft(child) ⇒
+            Some(s"LTRIM(${childToString(child)})")
+
+        case StringTrimRight(child) ⇒
+            Some(s"RTRIM(${childToString(child)})")
+
+        case StringTrim(child) ⇒
+            Some(s"TRIM(${childToString(child)})")
+
+        case RegExpReplace(subject, regexp, rep) ⇒
+            Some(s"REGEXP_REPLACE(${childToString(subject)}, ${childToString(regexp)}, ${childToString(rep)})")
+
+        case StringRepeat(str, times) ⇒
+            Some(s"REPEAT(${childToString(str)}, ${childToString(times)})")
+
+        case SoundEx(child) ⇒
+            Some(s"SOUND_EX(${childToString(child)})")
+
+        case StringSpace(child) ⇒
+            Some(s"SPACE(${childToString(child)})")
+
+        case Substring(str, pos, len) ⇒
+            Some(s"SUBSTR(${childToString(str)}, ${childToString(pos)}, ${childToString(len)})")
+
+        case StringTranslate(str, strMatch, strReplace) ⇒
+            Some(s"TRANSLATE(${childToString(str)}, ${childToString(strMatch)}, ${childToString(strReplace)})")
+
+        case _ ⇒
+            None
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
new file mode 100644
index 0000000..f46eb72
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+/**
+  * Provides methods to work with Spark SQL expression that supported by Ignite SQL syntax.
+  */
+private[optimization] trait SupportedExpressions {
+    /**
+      * @param expr Expression to check.
+      * @param checkChild Closure to check child expression.
+      * @return True if `expr` are supported, false otherwise.
+      */
+    def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean
+
+    /**
+      * @param expr Expression to convert to string.
+      * @param childToString Closure to convert children expressions.
+      * @param useQualifier If true `expr` should be printed using qualifier. `Table1.id` for example.
+      * @param useAlias If true `expr` should be printed with alias. `name as person_name` for example.
+      * @return SQL representation of `expr` if it supported. `None` otherwise.
+      */
+    def toString(expr: Expression, childToString: (Expression) ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String]
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
new file mode 100644
index 0000000..40e4e29
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.ignite.IgniteException
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo, Expression, Greatest, If, IfNull, IsNotNull, IsNull, Least, Literal, NullIf, Nvl2}
+
+/**
+  * Object to support some built-in expressions like `nvl2` or `coalesce`.
+  */
+private[optimization] object SystemExpressions extends SupportedExpressions {
+    /** @inheritdoc */
+    override def apply(expr: Expression, checkChild: Expression ⇒ Boolean): Boolean = expr match {
+        case Coalesce(children) ⇒
+            children.forall(checkChild)
+
+        case Greatest(children) ⇒
+            children.forall(checkChild)
+
+        case IfNull(left, right, _) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Least(children) ⇒
+            children.forall(checkChild)
+
+        case NullIf(left, right, _) ⇒
+            checkChild(left) && checkChild(right)
+
+        case Nvl2(expr1, expr2, expr3, _) ⇒
+            checkChild(expr1) && checkChild(expr2) && checkChild(expr3)
+
+        case If(predicate, trueValue, falseValue) ⇒
+            predicate match {
+                case IsNotNull(child) ⇒
+                    checkChild(child) && checkChild(trueValue) && checkChild(falseValue)
+
+                case IsNull(child) ⇒
+                    checkChild(child) && checkChild(trueValue) && checkChild(falseValue)
+
+                case EqualTo(left, right) ⇒
+                    trueValue match {
+                        case Literal(null, _) ⇒
+                            (left == falseValue || right == falseValue) && checkChild(left) && checkChild(right)
+
+                        case _ ⇒
+                            false
+                    }
+
+                case _ ⇒
+                    false
+            }
+
+        case _ ⇒
+            false
+    }
+
+    /** @inheritdoc */
+    override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+        useAlias: Boolean): Option[String] = expr match {
+        case Coalesce(children) ⇒
+            Some(s"COALESCE(${children.map(childToString(_)).mkString(", ")})")
+
+        case Greatest(children) ⇒
+            Some(s"GREATEST(${children.map(childToString(_)).mkString(", ")})")
+
+        case IfNull(left, right, _) ⇒
+            Some(s"IFNULL(${childToString(left)}, ${childToString(right)})")
+
+        case Least(children) ⇒
+            Some(s"LEAST(${children.map(childToString(_)).mkString(", ")})")
+
+        case NullIf(left, right, _) ⇒
+            Some(s"NULLIF(${childToString(left)}, ${childToString(right)})")
+
+        case Nvl2(expr1, expr2, expr3, _) ⇒
+            Some(s"NVL2(${childToString(expr1)}, ${childToString(expr2)}, ${childToString(expr3)})")
+
+        case If(predicate, trueValue, falseValue) ⇒
+            predicate match {
+                case IsNotNull(child) ⇒
+                    Some(s"NVL2(${childToString(child)}, ${childToString(trueValue)}, ${childToString(falseValue)})")
+
+                case IsNull(child) ⇒
+                    Some(s"NVL2(${childToString(child)}, ${childToString(falseValue)}, ${childToString(trueValue)})")
+
+                case EqualTo(left, right) ⇒
+                    trueValue match {
+                        case Literal(null, _) ⇒
+                            if (left == falseValue)
+                                Some(s"NULLIF(${childToString(left)}, ${childToString(right)})")
+                            else if (right == falseValue)
+                                Some(s"NULLIF(${childToString(right)}, ${childToString(left)})")
+                            else
+                                throw new IgniteException(s"Expression not supported. $expr")
+
+                        case _ ⇒
+                            throw new IgniteException(s"Expression not supported. $expr")
+                    }
+
+                case _ ⇒
+                    throw new IgniteException(s"Expression not supported. $expr")
+            }
+
+        case _ ⇒
+            None
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
new file mode 100644
index 0000000..7ae5e70
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.optimization._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.BinaryNode
+import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, RightOuter}
+
+/**
+  * Accumulator to store information about join query.
+  */
+private[apache] case class JoinSQLAccumulator(
+    igniteQueryContext: IgniteQueryContext,
+    left: QueryAccumulator,
+    right: QueryAccumulator,
+    joinType: JoinType,
+    outputExpressions: Seq[NamedExpression],
+    condition: Option[Expression],
+    leftAlias: Option[String],
+    rightAlias: Option[String],
+    distinct: Boolean = false,
+    where: Option[Seq[Expression]] = None,
+    groupBy: Option[Seq[Expression]] = None,
+    having: Option[Seq[Expression]] = None,
+    limit: Option[Expression] = None,
+    localLimit: Option[Expression] = None,
+    orderBy: Option[Seq[SortOrder]] = None
+) extends BinaryNode with SelectAccumulator {
+    /** @inheritdoc */
+    override def compileQuery(prettyPrint: Boolean = false): String = {
+        val delim = if (prettyPrint) "\n" else " "
+        val tab = if (prettyPrint) "  " else ""
+
+        var sql = s"SELECT$delim$tab" +
+            s"${fixQualifier(outputExpressions).map(exprToString(_, useQualifier = true)).mkString(", ")}$delim" +
+            s"FROM$delim$tab$compileJoinExpr"
+
+        if (allFilters.nonEmpty)
+            sql += s"${delim}WHERE$delim$tab" +
+                s"${fixQualifier(allFilters).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}"
+
+        if (groupBy.exists(_.nonEmpty))
+            sql += s"${delim}GROUP BY " +
+                s"${fixQualifier(groupBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}"
+
+        if (having.exists(_.nonEmpty))
+            sql += s"${delim}HAVING " +
+                s"${fixQualifier(having.get).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}"
+
+        if (orderBy.exists(_.nonEmpty))
+            sql += s"${delim}ORDER BY " +
+                s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}"
+
+        if (limit.isDefined)
+            sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true)}"
+
+        sql
+    }
+
+    /**
+      * @return Filters for this query.
+      */
+    private def allFilters: Seq[Expression] = {
+        val leftFilters =
+            if (isSimpleTableAcc(left))
+                left.asInstanceOf[SingleTableSQLAccumulator].where.getOrElse(Seq.empty)
+            else
+                Seq.empty
+
+        val rightFilters =
+            if (isSimpleTableAcc(right))
+                right.asInstanceOf[SingleTableSQLAccumulator].where.getOrElse(Seq.empty)
+            else Seq.empty
+
+        where.getOrElse(Seq.empty) ++ leftFilters ++ rightFilters
+    }
+
+    /**
+      * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query.
+      */
+    private def compileJoinExpr: String = {
+        val leftJoinSql =
+            if (isSimpleTableAcc(left))
+                left.asInstanceOf[SingleTableSQLAccumulator].table.get
+            else
+                s"(${left.compileQuery()}) ${leftAlias.get}"
+
+        val rightJoinSql = {
+            val leftTableName =
+                if (isSimpleTableAcc(left))
+                    left.qualifier
+                else
+                    leftAlias.get
+
+            if (isSimpleTableAcc(right)) {
+                val rightTableName = right.asInstanceOf[SingleTableSQLAccumulator].table.get
+
+                if (leftTableName == rightTableName)
+                    s"$rightTableName as ${rightAlias.get}"
+                else
+                    rightTableName
+            } else
+                s"(${right.compileQuery()}) ${rightAlias.get}"
+        }
+
+        s"$leftJoinSql $joinTypeSQL $rightJoinSql" +
+            s"${condition.map(expr ⇒ s" ON ${exprToString(fixQualifier0(expr), useQualifier = true)}").getOrElse("")}"
+    }
+
+    /**
+      * @return SQL string representing specific join type.
+      */
+    private def joinTypeSQL = joinType match {
+        case Inner ⇒
+            "JOIN"
+        case LeftOuter ⇒
+            "LEFT JOIN"
+
+        case RightOuter ⇒
+            "RIGHT JOIN"
+
+        case _ ⇒
+            throw new IgniteException(s"Unsupported join type $joinType")
+    }
+
+    /**
+      * Changes table qualifier in case of embedded query.
+      *
+      * @param exprs Expressions to fix.
+      * @tparam T type of input expression.
+      * @return copy of `exprs` with fixed qualifier.
+      */
+    private def fixQualifier[T <: Expression](exprs: Seq[T]): Seq[T] =
+        exprs.map(fixQualifier0)
+
+    /**
+      * Changes table qualifier for single expression.
+      *
+      * @param expr Expression to fix.
+      * @tparam T type of input expression.
+      * @return copy of `expr` with fixed qualifier.
+      */
+    private def fixQualifier0[T <: Expression](expr: T): T = expr match {
+        case attr: AttributeReference ⇒
+            attr.withQualifier(Some(findQualifier(attr))).asInstanceOf[T]
+
+        case _ ⇒
+            expr.withNewChildren(fixQualifier(expr.children)).asInstanceOf[T]
+    }
+
+    /**
+      * Find right qualifier for a `attr`.
+      *
+      * @param attr Attribute to fix qualifier in
+      * @return Right qualifier for a `attr`
+      */
+    private def findQualifier(attr: AttributeReference): String = {
+        val leftTableName =
+            if (isSimpleTableAcc(left))
+                left.qualifier
+            else
+                leftAlias.get
+
+        if (left.outputExpressions.exists(_.exprId == attr.exprId))
+            leftTableName
+        else if (isSimpleTableAcc(right) && right.qualifier != leftTableName)
+            right.qualifier
+        else
+            rightAlias.get
+    }
+
+    /** @inheritdoc */
+    override def simpleString: String =
+        s"JoinSQLAccumulator(joinType: $joinType, columns: $outputExpressions, condition: $condition)"
+
+    /** @inheritdoc */
+    override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): SelectAccumulator = copy(outputExpressions= outputExpressions)
+
+    /** @inheritdoc */
+    override def withDistinct(distinct: Boolean): JoinSQLAccumulator = copy(distinct = distinct)
+
+    /** @inheritdoc */
+    override def withWhere(where: Seq[Expression]): JoinSQLAccumulator = copy(where = Some(where))
+
+    /** @inheritdoc */
+    override def withGroupBy(groupBy: Seq[Expression]): JoinSQLAccumulator = copy(groupBy = Some(groupBy))
+
+    /** @inheritdoc */
+    override def withHaving(having: Seq[Expression]): JoinSQLAccumulator = copy(having = Some(having))
+
+    /** @inheritdoc */
+    override def withLimit(limit: Expression): JoinSQLAccumulator = copy(limit = Some(limit))
+
+    /** @inheritdoc */
+    override def withLocalLimit(localLimit: Expression): JoinSQLAccumulator = copy(localLimit = Some(localLimit))
+
+    /** @inheritdoc */
+    override def withOrderBy(orderBy: Seq[SortOrder]): JoinSQLAccumulator = copy(orderBy = Some(orderBy))
+
+    /** @inheritdoc */
+    override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty))
+
+    /** @inheritdoc */
+    override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
new file mode 100644
index 0000000..133d355
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.spark.impl.optimization.IgniteQueryContext
+import org.apache.spark.sql.catalyst.expressions.{NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+  * Generic query info accumulator interface.
+  */
+private[apache] trait QueryAccumulator extends LogicalPlan {
+    /**
+      * @return Ignite query context.
+      */
+    def igniteQueryContext: IgniteQueryContext
+
+    /**
+      * @return Generated output.
+      */
+    def outputExpressions: Seq[NamedExpression]
+
+    /**
+      * @return Ordering info.
+      */
+    def orderBy: Option[Seq[SortOrder]]
+
+    /**
+      * @param outputExpressions New output expressions.
+      * @return Copy of this accumulator with new output.
+      */
+    def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator
+
+    /**
+      * @param orderBy New ordering.
+      * @return Copy of this accumulator with new order.
+      */
+    def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator
+
+    /**
+      * @param prettyPrint If true human readable query will be generated.
+      * @return SQL query.
+      */
+    def compileQuery(prettyPrint: Boolean = false): String
+
+    /**
+      * @return Qualifier that should be use to select data from this accumulator.
+      */
+    def qualifier: String
+
+    /**
+      * All expressions are resolved when extra optimization executed.
+      */
+    override lazy val resolved = true
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala
new file mode 100644
index 0000000..c1db6f9
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+/**
+  * Generic interface for a SELECT query.
+  */
+private[apache] trait SelectAccumulator extends QueryAccumulator {
+    /**
+      * @return Expression for HAVING part of query.
+      */
+    def having: Option[Seq[Expression]]
+
+    /**
+      * @return Expression for WHERE part of query.
+      */
+    def where: Option[Seq[Expression]]
+
+    /**
+      * @return Expression for GROUP BY part of query.
+      */
+    def groupBy: Option[Seq[Expression]]
+
+    /**
+      * @return Copy of this accumulator with `distinct` flag.
+      */
+    def withDistinct(distinct: Boolean): SelectAccumulator
+
+    /**
+      * @return Copy of this accumulator with `where` expressions.
+      */
+    def withWhere(where: Seq[Expression]): SelectAccumulator
+
+    /**
+      * @return Copy of this accumulator with `groupBy` expressions.
+      */
+    def withGroupBy(groupBy: Seq[Expression]): SelectAccumulator
+
+    /**
+      * @return Copy of this accumulator with `having` expressions.
+      */
+    def withHaving(having: Seq[Expression]): SelectAccumulator
+
+    /**
+      * @return Copy of this accumulator with `limit` expression.
+      */
+    def withLimit(limit: Expression): SelectAccumulator
+
+    /**
+      * @return Copy of this accumulator with `localLimit` expression.
+      */
+    def withLocalLimit(localLimit: Expression): SelectAccumulator
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
new file mode 100644
index 0000000..47035b9
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.optimization._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+  * Class for accumulating parts of SQL query to a single Ignite table.
+  *
+  * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>.
+  */
+private[apache] case class SingleTableSQLAccumulator(
+    igniteQueryContext: IgniteQueryContext,
+    table: Option[String],
+    tableExpression: Option[(QueryAccumulator, String)],
+    outputExpressions: Seq[NamedExpression],
+    distinct: Boolean = false,
+    all: Boolean = false,
+    where: Option[Seq[Expression]] = None,
+    groupBy: Option[Seq[Expression]] = None,
+    having: Option[Seq[Expression]] = None,
+    limit: Option[Expression] = None,
+    localLimit: Option[Expression] = None,
+    orderBy: Option[Seq[SortOrder]] = None
+) extends SelectAccumulator {
+    /** @inheritdoc */
+    override def compileQuery(prettyPrint: Boolean = false): String = {
+        val delim = if (prettyPrint) "\n" else " "
+        val tab = if (prettyPrint) "  " else ""
+
+        var sql = s"SELECT$delim$tab${outputExpressions.map(exprToString(_)).mkString(", ")}${delim}" +
+            s"FROM$delim$tab$compiledTableExpression"
+
+        if (where.exists(_.nonEmpty))
+            sql += s"${delim}WHERE$delim$tab${where.get.map(exprToString(_)).mkString(s" AND$delim$tab")}"
+
+        if (groupBy.exists(_.nonEmpty))
+            sql += s"${delim}GROUP BY ${groupBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
+
+        if (having.exists(_.nonEmpty))
+            sql += s"${delim}HAVING ${having.get.map(exprToString(_)).mkString(s" AND$delim$tab")}"
+
+        if (orderBy.exists(_.nonEmpty))
+            sql += s"${delim}ORDER BY ${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
+
+        if (limit.isDefined)
+            sql += s" LIMIT ${limit.map(exprToString(_)).get}"
+
+        sql
+    }
+
+    /**
+      * @return From table SQL query part.
+      */
+    private def compiledTableExpression: String = table match {
+        case Some(tableName) ⇒
+            tableName
+
+        case None ⇒ tableExpression match {
+            case Some((acc, alias)) ⇒
+                s"(${acc.compileQuery()}) $alias"
+
+            case None ⇒
+                throw new IgniteException("Unknown table.")
+        }
+    }
+
+    /** @inheritdoc */
+    override def simpleString: String =
+        s"IgniteSQLAccumulator(table: $table, columns: $outputExpressions, distinct: $distinct, all: $all, " +
+            s"where: $where, groupBy: $groupBy, having: $having, limit: $limit, orderBy: $orderBy)"
+
+    /** @inheritdoc */
+    override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): SelectAccumulator =
+        copy(outputExpressions= outputExpressions)
+
+    /** @inheritdoc */
+    override def withDistinct(distinct: Boolean): SingleTableSQLAccumulator = copy(distinct = distinct)
+
+    /** @inheritdoc */
+    override def withWhere(where: Seq[Expression]): SingleTableSQLAccumulator = copy(where = Some(where))
+
+    /** @inheritdoc */
+    override def withGroupBy(groupBy: Seq[Expression]): SingleTableSQLAccumulator = copy(groupBy = Some(groupBy))
+
+    /** @inheritdoc */
+    override def withHaving(having: Seq[Expression]): SingleTableSQLAccumulator = copy(having = Some(having))
+
+    /** @inheritdoc */
+    override def withLimit(limit: Expression): SingleTableSQLAccumulator = copy(limit = Some(limit))
+
+    /** @inheritdoc */
+    override def withLocalLimit(localLimit: Expression): SingleTableSQLAccumulator = copy(localLimit = Some(localLimit))
+
+    /** @inheritdoc */
+    override def withOrderBy(orderBy: Seq[SortOrder]): SingleTableSQLAccumulator = copy(orderBy = Some(orderBy))
+
+    /** @inheritdoc */
+    override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty))
+
+    /** @inheritdoc */
+    override def qualifier: String = table.getOrElse(tableExpression.get._2)
+
+    /** @inheritdoc */
+    override def children: Seq[LogicalPlan] = tableExpression.map(te ⇒ Seq(te._1)).getOrElse(Nil)
+}