You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2017/12/29 21:13:36 UTC

[2/2] ignite git commit: IGNITE-3084 - Ignite data source implementation for Spark data frame API - Fixes #2742

IGNITE-3084 - Ignite data source implementation for Spark data frame API - Fixes #2742

Signed-off-by: Valentin Kulichenko <va...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44428f31
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44428f31
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44428f31

Branch: refs/heads/master
Commit: 44428f31817f1f67823373dacaf178228c02ed6c
Parents: a3b8324
Author: Nikolay Izhikov <ni...@gmail.com>
Authored: Fri Dec 29 13:13:26 2017 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Dec 29 13:13:26 2017 -0800

----------------------------------------------------------------------
 .../examples/spark/IgniteCatalogExample.scala   | 125 +++++++
 .../examples/spark/IgniteDataFrameExample.scala | 146 ++++++++
 .../apache/ignite/examples/spark/package.scala  |  37 ++
 .../spark/examples/IgniteDataFrameSelfTest.java |  44 +++
 .../IgniteExamplesSparkSelfTestSuite.java       |   2 +
 modules/spark-2.10/pom.xml                      | 205 +++++++++--
 modules/spark/pom.xml                           |  31 +-
 ....apache.spark.sql.sources.DataSourceRegister |   1 +
 .../org/apache/ignite/spark/IgniteContext.scala |  44 ++-
 .../ignite/spark/IgniteDataFrameSettings.scala  |  56 +++
 .../org/apache/ignite/spark/IgniteRDD.scala     |  65 ++--
 .../spark/impl/IgniteDataFramePartition.scala   |  31 ++
 .../ignite/spark/impl/IgnitePartition.scala     |   2 +-
 .../spark/impl/IgniteRelationProvider.scala     |  90 +++++
 .../spark/impl/IgniteSQLDataFrameRDD.scala      |  82 +++++
 .../ignite/spark/impl/IgniteSQLRelation.scala   | 227 ++++++++++++
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |  16 +-
 .../org/apache/ignite/spark/impl/package.scala  | 132 +++++++
 .../sql/ignite/IgniteExternalCatalog.scala      | 305 +++++++++++++++++
 .../spark/sql/ignite/IgniteSharedState.scala    |  43 +++
 .../spark/sql/ignite/IgniteSparkSession.scala   | 343 +++++++++++++++++++
 modules/spark/src/test/resources/cities.json    |   3 +
 .../src/test/resources/ignite-spark-config.xml  |  64 ++++
 .../ignite/spark/AbstractDataFrameSpec.scala    | 164 +++++++++
 .../apache/ignite/spark/IgniteCatalogSpec.scala | 160 +++++++++
 .../spark/IgniteDataFrameSchemaSpec.scala       |  88 +++++
 .../ignite/spark/IgniteDataFrameSuite.scala     |  30 ++
 .../spark/IgniteDataFrameWrongConfigSpec.scala  |  53 +++
 .../ignite/spark/IgniteSQLDataFrameSpec.scala   | 288 ++++++++++++++++
 parent/pom.xml                                  |   7 +-
 30 files changed, 2805 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala
new file mode 100644
index 0000000..4382e10
--- /dev/null
+++ b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.spark
+
+import java.lang.{Long ⇒ JLong}
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+
+/**
+  * Example application to show use-case for Ignite implementation of Spark SQL {@link org.apache.spark.sql.catalog.Catalog}.
+  * Catalog provides ability to automatically resolve SQL tables created in Ignite.
+  */
+object IgniteCatalogExample extends App {
+    /**
+      * Ignite config file.
+      */
+    private val CONFIG = "examples/config/example-ignite.xml"
+
+    /**
+      * Test cache name.
+      */
+    private val CACHE_NAME = "testCache"
+
+    //Starting Ignite server node.
+    val ignite = setupServerAndData
+
+    closeAfter(ignite) { ignite ⇒
+        //Creating Ignite-specific implementation of Spark session.
+        val igniteSession = IgniteSparkSession.builder()
+            .appName("Spark Ignite catalog example")
+            .master("local")
+            .config("spark.executor.instances", "2")
+            .igniteConfig(CONFIG)
+            .getOrCreate()
+
+        //Adjust the logger to exclude the logs of no interest.
+        Logger.getRootLogger.setLevel(Level.ERROR)
+        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+        //Showing existing tables.
+        igniteSession.catalog.listTables().show()
+
+        //Showing `person` schema.
+        igniteSession.catalog.listColumns("person").show()
+
+        //Showing `city` schema.
+        igniteSession.catalog.listColumns("city").show()
+
+        //Selecting data throw Spark SQL engine.
+        val df = igniteSession.sql("SELECT * FROM person WHERE CITY_ID = 2")
+
+        df.printSchema()
+        df.show()
+
+        //Selecting data throw Spark SQL engine.
+        val df2 = igniteSession.sql("SELECT * FROM person p JOIN city c ON c.ID = p.CITY_ID WHERE p.CITY_ID = 2")
+
+        df2.printSchema()
+        df2.show()
+    }
+
+    /**
+      * Starting ignite server node and creating.
+      *
+      * @return Ignite server node.
+      */
+    def setupServerAndData: Ignite = {
+        //Starting Ignite.
+        val ignite = Ignition.start(CONFIG)
+
+        //Creating cache.
+        val ccfg = new CacheConfiguration[Int, Int](CACHE_NAME).setSqlSchema("PUBLIC")
+
+        val cache = ignite.getOrCreateCache(ccfg)
+
+        //Create tables.
+        cache.query(new SqlFieldsQuery(
+            "CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR) WITH \"template=replicated\"")).getAll
+
+        cache.query(new SqlFieldsQuery(
+            "CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) " +
+                "WITH \"backups=1, affinityKey=city_id\"")).getAll
+
+        cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll
+
+        println("Created database objects.")
+
+        //Inserting some data into table.
+        var qry = new SqlFieldsQuery("INSERT INTO city (id, name) VALUES (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 3L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 2L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 1L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 2L.asInstanceOf[JLong])).getAll
+
+        println("Populated data.")
+
+        ignite
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala
new file mode 100644
index 0000000..ae84915
--- /dev/null
+++ b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.spark
+
+import java.lang.{Long ⇒ JLong, String ⇒ JString}
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+
+/**
+  * Example application showing use-cases for Ignite implementation of Spark DataFrame API.
+  */
+object IgniteDataFrameExample extends App {
+    /**
+      * Ignite config file.
+      */
+    private val CONFIG = "examples/config/example-ignite.xml"
+
+    /**
+      * Test cache name.
+      */
+    private val CACHE_NAME = "testCache"
+
+    //Starting Ignite server node.
+    val ignite = setupServerAndData
+
+    closeAfter(ignite) { ignite ⇒
+        //Creating spark session.
+        implicit val spark = SparkSession.builder()
+            .appName("Spark Ignite data sources example")
+            .master("local")
+            .config("spark.executor.instances", "2")
+            .getOrCreate()
+
+        //Adjust the logger to exclude the logs of no interest.
+        Logger.getRootLogger.setLevel(Level.ERROR)
+        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+        //Executing examples.
+
+        sparkDSLExample
+
+        nativeSparkSqlExample
+    }
+
+    /**
+      * Examples of usage Ignite DataFrame implementation.
+      * Selecting data throw Spark DSL.
+      *
+      * @param spark SparkSession.
+      */
+    def sparkDSLExample(implicit spark: SparkSession): Unit = {
+        val igniteDF = spark.read
+            .format(FORMAT_IGNITE) //Data source type.
+            .option(OPTION_TABLE, "person") //Table to read.
+            .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
+            .load()
+            .filter(col("id") >= 2) //Filter clause.
+            .filter(col("name") like "%M%") //Another filter clause.
+
+        igniteDF.printSchema() //Printing query schema to console.
+        igniteDF.show() //Printing query results to console.
+    }
+
+    /**
+      * Examples of usage Ignite DataFrame implementation.
+      * Registration of Ignite DataFrame for following usage.
+      * Selecting data by Spark SQL query.
+      *
+      * @param spark SparkSession.
+      */
+    def nativeSparkSqlExample(implicit spark: SparkSession): Unit = {
+        val df = spark.read
+            .format(FORMAT_IGNITE) //Data source type.
+            .option(OPTION_TABLE, "person") //Table to read.
+            .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
+            .load()
+
+        //Registering DataFrame as Spark view.
+        df.createOrReplaceTempView("person")
+
+        //Selecting data from Ignite throw Spark SQL Engine.
+        val igniteDF = spark.sql("SELECT * FROM person WHERE id >= 2 AND name = 'Mary Major'")
+
+        igniteDF.printSchema() //Printing query schema to console.
+        igniteDF.show() //Printing query results to console.
+    }
+
+    def setupServerAndData: Ignite = {
+        //Starting Ignite.
+        val ignite = Ignition.start(CONFIG)
+
+        //Creating first test cache.
+        val ccfg = new CacheConfiguration[JLong, JString](CACHE_NAME).setSqlSchema("PUBLIC")
+
+        val cache = ignite.getOrCreateCache(ccfg)
+
+        //Creating SQL tables.
+        cache.query(new SqlFieldsQuery(
+            "CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR) WITH \"template=replicated\"")).getAll
+
+        cache.query(new SqlFieldsQuery(
+            "CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) " +
+                "WITH \"backups=1, affinityKey=city_id\"")).getAll
+
+        cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll
+
+        //Inserting some data to tables.
+        var qry = new SqlFieldsQuery("INSERT INTO city (id, name) VALUES (?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll
+
+        qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 3L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 2L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 1L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 2L.asInstanceOf[JLong])).getAll
+
+        println("Populated data.")
+
+        ignite
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala
new file mode 100644
index 0000000..a877d82
--- /dev/null
+++ b/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.examples
+
+/**
+  */
+package object spark {
+    /**
+      * Utility object.
+      * Takes a `AutoCloseable` resource and closure to work with it.
+      * After work is done - closes the resource.
+      */
+    object closeAfter {
+        def apply[R <: AutoCloseable, T](r: R)(c: (R) ⇒ T) = {
+            try {
+                c(r)
+            }
+            finally {
+                r.close
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
new file mode 100644
index 0000000..b18d870
--- /dev/null
+++ b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples;
+
+import org.apache.ignite.examples.spark.IgniteCatalogExample;
+import org.apache.ignite.examples.spark.IgniteDataFrameExample;
+import org.junit.Test;
+
+/**
+ */
+public class IgniteDataFrameSelfTest {
+    static final String[] EMPTY_ARGS = new String[0];
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCatalogExample() throws Exception {
+        IgniteCatalogExample.main(EMPTY_ARGS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDataFrameExample() throws Exception {
+        IgniteDataFrameExample.main(EMPTY_ARGS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java b/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
index 73b286a..9e1b2df 100644
--- a/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
+++ b/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spark.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.spark.examples.IgniteDataFrameSelfTest;
 import org.apache.ignite.spark.examples.SharedRDDExampleSelfTest;
 import org.apache.ignite.testframework.GridTestUtils;
 
@@ -40,6 +41,7 @@ public class IgniteExamplesSparkSelfTestSuite extends TestSuite {
         TestSuite suite = new TestSuite("Ignite Examples Test Suite");
 
         suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));
+        suite.addTest(new TestSuite(IgniteDataFrameSelfTest.class));
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark-2.10/pom.xml b/modules/spark-2.10/pom.xml
index 326c8e1..b973afb 100644
--- a/modules/spark-2.10/pom.xml
+++ b/modules/spark-2.10/pom.xml
@@ -56,6 +56,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala210.library.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
             <version>${spark.version}</version>
@@ -63,7 +69,7 @@
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-unsafe_2.10</artifactId>
+            <artifactId>spark-catalyst_2.10</artifactId>
             <version>${spark.version}</version>
         </dependency>
 
@@ -74,9 +80,9 @@
         </dependency>
 
         <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-            <version>${jackson2.version}</version>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-network-common_2.10</artifactId>
+            <version>${spark.version}</version>
         </dependency>
 
         <dependency>
@@ -86,73 +92,212 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${spark.hadoop.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
             <version>${jackson2.version}</version>
         </dependency>
 
         <dependency>
+            <groupId>org.json4s</groupId>
+            <artifactId>json4s-core_2.10</artifactId>
+            <version>3.5.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-indexing</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_2.10</artifactId>
+            <version>2.2.6</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-tags_2.10</artifactId>
+            <artifactId>spark-unsafe_2.10</artifactId>
             <version>${spark.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-catalyst_2.10</artifactId>
+            <artifactId>spark-launcher_2.10</artifactId>
             <version>${spark.version}</version>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-tags_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.5</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.0.29.Final</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>2.20</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>chill_2.10</artifactId>
+            <version>0.8.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.0.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-json</artifactId>
+            <version>3.1.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>tomcat-servlet-api</artifactId>
+            <version>8.0.23</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet-core</artifactId>
+            <version>2.25</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson2.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-scala_2.10</artifactId>
+            <version>${jackson2.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.xbean</groupId>
+            <artifactId>xbean-asm5-shaded</artifactId>
+            <version>4.5</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>net.jpountz.lz4</groupId>
+            <artifactId>lz4</artifactId>
+            <version>1.3.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>1.9.0</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-common</artifactId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-core_2.11</artifactId>
-            <version>3.5.0</version>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
         </dependency>
 
-        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>commons-compiler</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_2.10</artifactId>
-            <version>2.2.2</version>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+            <version>4.5.3</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-indexing</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>1.7.16</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava14.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
         <sourceDirectory>../spark/src/main/scala</sourceDirectory>
+        <testSourceDirectory>../spark/src/test/java</testSourceDirectory>
 
         <resources>
             <resource>
-                <directory>../spark/src/main/scala</directory>
-                <excludes>
-                    <exclude>**/*.scala</exclude>
-                </excludes>
+                <directory>../spark/src/main/resources</directory>
             </resource>
         </resources>
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 3cd2e1c..458abd8 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -52,7 +52,13 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.11.8</version>
+            <version>${scala211.library.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala211.library.version}</version>
         </dependency>
 
         <dependency>
@@ -88,7 +94,7 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
+            <version>${spark.hadoop.version}</version>
         </dependency>
 
         <dependency>
@@ -156,12 +162,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-unsafe_2.10</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
             <version>3.5</version>
@@ -265,12 +265,27 @@
             <version>3.0.0</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.codehaus.janino</groupId>
             <artifactId>commons-compiler</artifactId>
             <version>3.0.0</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+            <version>4.5.3</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>1.7.16</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..8304662
--- /dev/null
+++ b/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.ignite.spark.impl.IgniteRelationProvider

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index edbf2be..ba1b244 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -21,9 +21,11 @@ import org.apache.ignite._
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.internal.util.IgniteUtils
+import org.apache.ignite.spark.IgniteContext.setIgniteHome
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.SparkContext
 import org.apache.log4j.Logger
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 
 /**
  * Ignite context.
@@ -60,6 +62,14 @@ class IgniteContext(
     // Make sure to start Ignite on context creation.
     ignite()
 
+    //Stop local ignite instance on application end.
+    //Instances on workers will be stopped with executor stop(jvm exit).
+    sparkContext.addSparkListener(new SparkListener {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+            close()
+        }
+    })
+
     /**
      * Creates an instance of IgniteContext with the given spring configuration.
      *
@@ -129,15 +139,7 @@ class IgniteContext(
      * @return
      */
     def ignite(): Ignite = {
-        val home = IgniteUtils.getIgniteHome
-
-        if (home == null && igniteHome != null) {
-            Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
-
-            IgniteUtils.nullifyHomeDirectory()
-
-            System.setProperty(IgniteSystemProperties.IGNITE_HOME, igniteHome)
-        }
+        setIgniteHome(igniteHome)
 
         val igniteCfg = cfgClo()
 
@@ -159,7 +161,7 @@ class IgniteContext(
      * Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be
      * a no-op.
      */
-    def close(shutdownIgniteOnWorkers: Boolean = false) = {
+    def close(shutdownIgniteOnWorkers: Boolean = false): Unit = {
         // additional check if called from driver
         if (sparkContext != null && shutdownIgniteOnWorkers) {
             // Get required number of executors with default equals to number of available executors.
@@ -180,7 +182,25 @@ class IgniteContext(
     private def doClose() = {
         val igniteCfg = cfgClo()
 
-        Ignition.stop(igniteCfg.getIgniteInstanceName, false)
+        if (Ignition.state(igniteCfg.getIgniteInstanceName) == IgniteState.STARTED)
+            Ignition.stop(igniteCfg.getIgniteInstanceName, false)
+    }
+}
+
+object IgniteContext {
+    def apply(sparkContext: SparkContext, cfgF: () ⇒ IgniteConfiguration, standalone: Boolean = true): IgniteContext =
+        new IgniteContext(sparkContext, cfgF, standalone)
+
+    def setIgniteHome(igniteHome: String): Unit = {
+        val home = IgniteUtils.getIgniteHome
+
+        if (home == null && igniteHome != null) {
+            Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
+
+            IgniteUtils.nullifyHomeDirectory()
+
+            System.setProperty(IgniteSystemProperties.IGNITE_HOME, igniteHome)
+        }
     }
 }
 
@@ -189,7 +209,7 @@ class IgniteContext(
  *
  * @param clo Closure to wrap.
  */
-private class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
+class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
     @transient @volatile var res: IgniteConfiguration = null
 
     def apply(): IgniteConfiguration = {

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
new file mode 100644
index 0000000..4261032
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+/**
+  */
+object IgniteDataFrameSettings {
+    /**
+      * Name of DataSource format for loading data from Apache Ignite.
+      */
+    val FORMAT_IGNITE = "ignite"
+
+    /**
+      * Config option to specify path to ignite config file.
+      * Config from this file will be used to connect to existing Ignite cluster.
+      *
+      * @note All nodes for executing Spark task forcibly will be started in client mode.
+      *
+      * @example {{{
+      * val igniteDF = spark.read.format(IGNITE)
+      *     .option(OPTION_CONFIG_FILE, CONFIG_FILE)
+      *     // other options ...
+      *     .load()
+      * }}}
+      */
+    val OPTION_CONFIG_FILE = "config"
+
+    /**
+      * Config option to specify Ignite SQL table name to load data from.
+      *
+      * @example {{{
+      * val igniteDF = spark.read.format(IGNITE)
+      *     // other options ...
+      *     .option(OPTION_TABLE, "mytable")
+      *     .load()
+      * }}}
+      *
+      * @see [[org.apache.ignite.cache.QueryEntity#tableName]]
+      */
+    val OPTION_TABLE = "table"
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 78e2223..fce47a6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -26,10 +26,10 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
 import org.apache.ignite.lang.IgniteUuid
 import org.apache.ignite.spark.impl._
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
+import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
 import org.apache.spark.sql._
-import org.apache.spark._
+import org.apache.spark.sql.types._
 
 import scala.collection.JavaConversions._
 
@@ -314,17 +314,35 @@ class IgniteRDD[K, V] (
      * @return Spark schema.
      */
     private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = {
-        new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true))
+        new StructType(fieldsMeta.map(i ⇒
+            new StructField(i.fieldName(), IgniteRDD.dataType(i.fieldTypeName(), i.fieldName()), nullable = true))
             .toArray)
     }
 
     /**
-     * Gets Spark data type based on type name.
+     * Generates affinity key for given cluster node.
      *
-     * @param typeName Type name.
-     * @return Spark data type.
+     * @param value Value to generate key for.
+     * @param node Node to generate key for.
+     * @return Affinity key.
      */
-    private def dataType(typeName: String): DataType = typeName match {
+    private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
+        val aff = ic.ignite().affinity[IgniteUuid](cacheName)
+
+        Stream.from(1, Math.max(1000, aff.partitions() * 2))
+            .map(_ ⇒ IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node))
+            .getOrElse(IgniteUuid.randomUuid())
+    }
+}
+
+object IgniteRDD {
+    /**
+      * Gets Spark data type based on type name.
+      *
+      * @param typeName Type name.
+      * @return Spark data type.
+      */
+    def dataType(typeName: String, fieldName: String): DataType = typeName match {
         case "java.lang.Boolean" ⇒ BooleanType
         case "java.lang.Byte" ⇒ ByteType
         case "java.lang.Short" ⇒ ShortType
@@ -343,17 +361,24 @@ class IgniteRDD[K, V] (
     }
 
     /**
-     * Generates affinity key for given cluster node.
-     *
-     * @param value Value to generate key for.
-     * @param node Node to generate key for.
-     * @return Affinity key.
-     */
-    private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
-        val aff = ic.ignite().affinity[IgniteUuid](cacheName)
-
-        Stream.from(1, Math.max(1000, aff.partitions() * 2))
-            .map(_ ⇒ IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node))
-            .getOrElse(IgniteUuid.randomUuid())
-    }
+      * Converts java.util.Date to java.sql.Date as j.u.Date not supported by Spark SQL.
+      *
+      * @param input Any value.
+      * @return If input is java.util.Date returns java.sql.Date representation of given value, otherwise returns unchanged value.
+      */
+    def convertIfNeeded(input: Any): Any =
+        if (input == null)
+            input
+        else {
+            input match {
+                case timestamp: java.sql.Timestamp ⇒
+                    timestamp
+
+                //Spark SQL doesn't support java.util.Date see - https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
+                case date: java.util.Date ⇒
+                    new java.sql.Date(date.getTime)
+
+                case _ ⇒ input
+            }
+        }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala
new file mode 100644
index 0000000..4c9c72e
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.spark.Partition
+
+/**
+  * DataFrame partition
+  *
+  * sparkPartitionIdx - index of partition
+  * primary - primary node for list of ignitePartitions
+  */
+case class IgniteDataFramePartition(sparkPartIdx: Int, primary: ClusterNode, igniteParts: List[Int]) extends Partition {
+    override def index: Int = sparkPartIdx
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
index 2def636..2107a5f 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
@@ -19,6 +19,6 @@ package org.apache.ignite.spark.impl
 
 import org.apache.spark.Partition
 
-class IgnitePartition(idx: Int) extends Partition {
+case class IgnitePartition(idx: Int) extends Partition {
     override def index: Int = idx
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
new file mode 100644
index 0000000..4762d8d
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.configuration.IgniteConfiguration
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.util.IgniteUtils
+import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+import org.apache.ignite.spark.IgniteContext
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.IgniteException
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID
+import org.apache.spark.sql.sources._
+
+/**
+  * Apache Ignite relation provider.
+  */
+class IgniteRelationProvider extends RelationProvider with DataSourceRegister {
+    /**
+      * @return "ignite" - name of relation provider.
+      */
+    override def shortName(): String = FORMAT_IGNITE
+
+    /**
+      * To create IgniteRelation we need a link to a ignite cluster and a table name.
+      * To refer cluster user have to specify one of config parameter:
+      * <ul>
+      *     <li><code>config</code> - path to ignite configuration file.
+      *     <li><code>grid</code> - grid name. Note that grid has to be started in the same jvm.
+      * <ul>
+      * Existing table inside Apache Ignite should be referred via <code>table</code> parameter.
+      *
+      * @param sqlCtx SQLContext.
+      * @param params Parameters for relation creation.
+      * @return IgniteRelation.
+      * @see IgniteRelation
+      * @see IgnitionEx#grid(String)
+      */
+    override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation = {
+        val igniteHome = IgniteUtils.getIgniteHome
+
+        def configProvider: () ⇒ IgniteConfiguration = {
+            if (params.contains(OPTION_CONFIG_FILE))
+                () ⇒ {
+                    IgniteContext.setIgniteHome(igniteHome)
+
+                    val cfg = IgnitionEx.loadConfiguration(params(OPTION_CONFIG_FILE)).get1()
+
+                    cfg.setClientMode(true)
+
+                    cfg
+                }
+            else if (params.contains(OPTION_GRID))
+                () ⇒ {
+                    IgniteContext.setIgniteHome(igniteHome)
+
+                    val cfg = ignite(params(OPTION_GRID)).configuration()
+
+                    cfg.setClientMode(true)
+
+                    cfg
+                }
+            else
+                throw new IgniteException("'config' must be specified to connect to ignite cluster.")
+        }
+
+        val ic = IgniteContext(sqlCtx.sparkContext, configProvider)
+
+        if (params.contains(OPTION_TABLE))
+            IgniteSQLRelation(ic, params(OPTION_TABLE).toUpperCase, sqlCtx)
+        else
+            throw new IgniteException("'table' must be specified for loading ignite data.")
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
new file mode 100644
index 0000000..93ef529
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.CacheMode
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.{Partition, TaskContext}
+import java.util.{List ⇒ JList}
+
+/**
+  * Implementation of Spark RDD for Apache Ignite to support Data Frame API.
+  */
+class IgniteSQLDataFrameRDD[K, V](
+    ic: IgniteContext,
+    cacheName: String,
+    schema: StructType,
+    qryStr: String,
+    args: List[_],
+    parts: Array[Partition]) extends
+    IgniteSqlRDD[Row, JList[_], K, V](
+        ic,
+        cacheName,
+        cacheCfg = null,
+        qry = null,
+        r ⇒ new GenericRowWithSchema(r.toArray.map(IgniteRDD.convertIfNeeded), schema),
+        keepBinary = true,
+        parts) {
+
+    /**
+      * Executes an Ignite query for this RDD and return Iterator to iterate throw results.
+      *
+      * @param partition Partition.
+      * @param context   TaskContext.
+      * @return Results of query for specific partition.
+      */
+    override def compute(partition: Partition, context: TaskContext): Iterator[Row] = {
+        val qry0 = new SqlFieldsQuery(qryStr)
+
+        if (args.nonEmpty)
+            qry0.setArgs(args.map(_.asInstanceOf[Object]): _*)
+
+        val ccfg = ic.ignite().cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
+
+        if (ccfg.getCacheMode != CacheMode.REPLICATED)
+            qry0.setPartitions(partition.asInstanceOf[IgniteDataFramePartition].igniteParts: _*)
+
+        qry = qry0
+
+        super.compute(partition, context)
+    }
+}
+
+object IgniteSQLDataFrameRDD {
+    def apply[K, V](ic: IgniteContext,
+        cacheName: String,
+        schema: StructType,
+        qryStr: String,
+        args: List[_],
+        parts: Array[Partition] = Array(IgnitePartition(0))) = {
+        new IgniteSQLDataFrameRDD(ic, cacheName, schema, qryStr, args, parts)
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
new file mode 100644
index 0000000..bdcf57b
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.{Ignite, IgniteException}
+import org.apache.ignite.cache.{CacheMode, QueryEntity}
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.spark.Partition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SQLContext}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Apache Ignite implementation of Spark BaseRelation with PrunedFilteredScan for Ignite SQL Tables
+  */
+class IgniteSQLRelation[K, V](
+    private[spark] val ic: IgniteContext,
+    private[spark] val tableName: String)
+    (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan {
+
+    /**
+      * @return Schema of Ignite SQL table.
+      */
+    override def schema: StructType =
+        igniteSQLTable(ic.ignite(), tableName)
+            .map(IgniteSQLRelation.schema)
+            .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
+
+    /**
+      * Builds Apache Ignite SQL Query for given table, columns and filters.
+      *
+      * @param columns Columns to select.
+      * @param filters Filters to apply.
+      * @return Apache Ignite RDD implementation.
+      */
+    override def buildScan(columns: Array[String], filters: Array[Filter]): RDD[Row] = {
+        val columnsStr =
+            if (columns.isEmpty)
+                "*"
+            else
+                columns.mkString(",")
+
+        //Creating corresponding Ignite SQL query.
+        //Query will be executed by Ignite SQL Engine.
+        val qryAndArgs = filters match {
+            case Array(_, _*) ⇒
+                val where = compileWhere(filters)
+                (s"SELECT $columnsStr FROM $tableName WHERE ${where._1}", where._2)
+            case _ ⇒
+                (s"SELECT $columnsStr FROM $tableName", List.empty)
+        }
+
+        IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryAndArgs._1, qryAndArgs._2, calcPartitions(filters))
+    }
+
+    override def toString = s"IgniteSQLRelation[table=$tableName]"
+
+    /**
+      * Builds `where` part of SQL query.
+      *
+      * @param filters Filter to apply.
+      * @return Tuple contains `where` string and `List[Any]` of query parameters.
+      */
+    private def compileWhere(filters: Array[Filter]): (String, List[Any]) =
+        filters.foldLeft(("", List[Any]()))(buildSingleClause)
+
+    /**
+      * Adds single where clause to `state` and returns new state.
+      *
+      * @param state Current `where` state.
+      * @param clause Clause to add.
+      * @return `where` with given clause.
+      */
+    private def buildSingleClause(state: (String, List[Any]), clause: Filter): (String, List[Any]) = {
+        val filterStr = state._1
+        val params = state._2
+
+        clause match {
+            case EqualTo(attr, value) ⇒ (addStrClause(filterStr, s"$attr = ?"), params :+ value)
+
+            case EqualNullSafe(attr, value) ⇒ (addStrClause(filterStr, s"($attr IS NULL OR $attr = ?)"), params :+ value)
+
+            case GreaterThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr > ?"), params :+ value)
+
+            case GreaterThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, s"$attr >= ?"), params :+ value)
+
+            case LessThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr < ?"), params :+ value)
+
+            case LessThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, s"$attr <= ?"), params :+ value)
+
+            case In(attr, values) ⇒ (addStrClause(filterStr, s"$attr IN (${values.map(_ ⇒ "?").mkString(",")})"), params ++ values)
+
+            case IsNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NULL"), params)
+
+            case IsNotNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NOT NULL"), params)
+
+            case And(left, right) ⇒
+                val leftClause = buildSingleClause(("", params), left)
+                val rightClause = buildSingleClause(("", leftClause._2), right)
+
+                (addStrClause(filterStr, s"${leftClause._1} AND ${rightClause._1}"), rightClause._2)
+
+            case Or(left, right) ⇒
+                val leftClause = buildSingleClause(("", params), left)
+                val rightClause = buildSingleClause(("", leftClause._2), right)
+
+                (addStrClause(filterStr, s"${leftClause._1} OR ${rightClause._1}"), rightClause._2)
+
+            case Not(child) ⇒
+                val innerClause = buildSingleClause(("", params), child)
+
+                (addStrClause(filterStr, s"NOT ${innerClause._1}"), innerClause._2)
+
+            case StringStartsWith(attr, value) ⇒
+                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ (value + "%"))
+
+            case StringEndsWith(attr, value) ⇒
+                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value))
+
+            case StringContains(attr, value) ⇒
+                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value + "%"))
+        }
+    }
+
+    private def calcPartitions(filters: Array[Filter]): Array[Partition] = {
+        val cache = ic.ignite().cache[K, V](cacheName)
+
+        val ccfg = cache.getConfiguration(classOf[CacheConfiguration[K, V]])
+
+        if (ccfg.getCacheMode == CacheMode.REPLICATED) {
+            val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes()
+
+            Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList))
+        }
+        else {
+            val aff = ic.ignite().affinity(cacheName)
+
+            val parts = aff.partitions()
+
+            val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) {
+                case (nodeToParts, ignitePartIdx) ⇒
+                    val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head
+
+                    if (nodeToParts.contains(primary)) {
+                        nodeToParts(primary) += ignitePartIdx
+
+                        nodeToParts
+                    }
+                    else
+                        nodeToParts + (primary → ArrayBuffer[Int](ignitePartIdx))
+            }
+
+            val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) ⇒
+                IgniteDataFramePartition(i, node, nodesParts.toList)
+            }
+
+            partitions.toArray
+        }
+    }
+
+    /**
+      * Cache name for a table name.
+      */
+    private lazy val cacheName: String =
+        sqlCacheName(ic.ignite(), tableName)
+            .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
+
+    /**
+      * Utility method to add clause to sql WHERE string.
+      *
+      * @param filterStr Current filter string
+      * @param clause Clause to add.
+      * @return Filter string.
+      */
+    private def addStrClause(filterStr: String, clause: String) =
+        if (filterStr.isEmpty)
+            clause
+        else
+            filterStr + " AND " + clause
+
+}
+
+object IgniteSQLRelation {
+    /**
+      * Converts Apache Ignite table description: <code>QueryEntity</code> to Spark description: <code>StructType</code>.
+      *
+      * @param table Ignite table descirption.
+      * @return Spark table descirption
+      */
+    def schema(table: QueryEntity): StructType = {
+        //Partition columns has to be in the end of list.
+        //See `org.apache.spark.sql.catalyst.catalog.CatalogTable#partitionSchema`
+        val columns = table.getFields.toList.sortBy(c ⇒ isKeyColumn(table, c._1))
+
+        StructType(columns.map { case (name, dataType) ⇒
+            StructField(
+                name = name,
+                dataType = IgniteRDD.dataType(dataType, name),
+                nullable = !isKeyColumn(table, name),
+                metadata = Metadata.empty)
+        })
+    }
+
+    def apply[K, V](ic: IgniteContext, tableName: String, sqlContext: SQLContext): IgniteSQLRelation[K, V] =
+        new IgniteSQLRelation[K, V](ic,tableName)(sqlContext)
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index f386f26..c843d61 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -28,9 +28,10 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
     ic: IgniteContext,
     cacheName: String,
     cacheCfg: CacheConfiguration[K, V],
-    qry: Query[T],
+    var qry: Query[T],
     conv: (T) ⇒ R,
-    keepBinary: Boolean
+    keepBinary: Boolean,
+    partitions: Array[Partition] = Array(IgnitePartition(0))
 ) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
     override def compute(split: Partition, context: TaskContext): Iterator[R] = {
         val cur = ensureCache().query(qry)
@@ -40,7 +41,12 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
         new IgniteQueryIterator[T, R](cur.iterator(), conv)
     }
 
-    override protected def getPartitions: Array[Partition] = {
-        Array(new IgnitePartition(0))
-    }
+    override protected def getPartitions: Array[Partition] = partitions
+}
+
+object IgniteSqlRDD {
+    def apply[R: ClassTag, T, K, V](ic: IgniteContext, cacheName: String, cacheCfg: CacheConfiguration[K, V],
+        qry: Query[T], conv: (T) ⇒ R, keepBinary: Boolean,
+        partitions: Array[Partition] = Array(IgnitePartition(0))): IgniteSqlRDD[R, T, K, V] =
+        new IgniteSqlRDD(ic, cacheName, cacheCfg, qry, conv, keepBinary, partitions)
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
new file mode 100644
index 0000000..815854c
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.commons.lang.StringUtils.equalsIgnoreCase
+import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition}
+import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.util.lang.GridFunc.contains
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+
+import scala.collection.JavaConversions._
+
+package object impl {
+    /**
+      * Checks named instance of Ignite exists.
+      * Throws IgniteException if not.
+      *
+      * @param gridName Name of grid.
+      */
+    def ensureIgnite(gridName: String): Unit =
+        if (!igniteExists(gridName))
+            throw new IgniteException(s"Ignite grid with name '$gridName' does not exist.")
+
+    /**
+      * @param gridName Name of grid.
+      * @return True if named instance of Ignite exists false otherwise.
+      */
+    def igniteExists(gridName: String): Boolean =
+        if (gridName == "")
+            Ignition.state() == IgniteState.STARTED
+        else
+            Ignition.state(gridName) == IgniteState.STARTED
+
+    /**
+      * @param g Ignite.
+      * @return Name of Ignite. If name is null empty string returned.
+      */
+    def igniteName(g: Ignite): String =
+        if(g.name() != null)
+            g.name
+        else
+            ""
+
+    /**
+      * @param name Name of grid..
+      * @param default Default instance.
+      * @return Named grid instance if it exists. If not default instance returned.
+      */
+    def igniteOrDefault(name: String, default: Ignite): Ignite =
+        if (name == SessionCatalog.DEFAULT_DATABASE) {
+            if (igniteExists(name))
+                ignite(name)
+            else
+                default
+        }
+        else
+            ignite(name)
+
+    /**
+      * @param gridName Name of grid.
+      * @return Named instance of grid. If 'gridName' is empty unnamed instance returned.
+      */
+    def ignite(gridName: String): Ignite =
+        if (gridName == "")
+            Ignition.ignite()
+        else
+            Ignition.ignite(gridName)
+
+    /**
+      * @param ignite Ignite instance.
+      * @param tabName Table name.
+      * @return True if table exists false otherwise.
+      */
+    def sqlTableExists(ignite: Ignite, tabName: String): Boolean =
+        sqlTableInfo(ignite, tabName).isDefined
+
+    /**
+      * @param ignite Ignite instance.
+      * @param tabName Table name.
+      * @return QueryEntity for a given table.
+      */
+    def igniteSQLTable(ignite: Ignite, tabName: String): Option[QueryEntity] =
+        sqlTableInfo[Any, Any](ignite, tabName).map(_._2)
+
+    /**
+      * @param ignite Ignite instance.
+      * @param tabName Table name.
+      * @return Cache name for given table.
+      */
+    def sqlCacheName(ignite: Ignite, tabName: String): Option[String] =
+        sqlTableInfo[Any, Any](ignite, tabName).map(_._1.getName)
+
+    /**
+      * @param ignite Ignite instance.
+      * @param tabName Table name.
+      * @tparam K Key class.
+      * @tparam V Value class.
+      * @return CacheConfiguration and QueryEntity for a given table.
+      */
+    private def sqlTableInfo[K, V](ignite: Ignite, tabName: String): Option[(CacheConfiguration[K, V], QueryEntity)] =
+        ignite.cacheNames().map { cacheName ⇒
+            val ccfg = ignite.cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
+
+            val queryEntities = ccfg.getQueryEntities
+
+            queryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe))
+        }.find(_.isDefined).flatten
+
+    /**
+      * @param table Table.
+      * @param column Column name.
+      * @return `True` if column is key.
+      */
+    def isKeyColumn(table: QueryEntity, column: String): Boolean =
+        contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column)
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
new file mode 100644
index 0000000..9c1fe0c
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+import org.apache.ignite.spark.IgniteContext
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.spark.impl.IgniteSQLRelation.schema
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.StructType
+import org.apache.ignite.spark.impl._
+import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID
+
+import scala.collection.JavaConversions._
+
+/**
+  * External catalog implementation to provide transparent access to SQL tables existed in Ignite.
+  *
+  * @param defaultIgniteContext Ignite context to provide access to Ignite instance. If <code>None</code> passed then no-name instance of Ignite used.
+  */
+private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) extends ExternalCatalog {
+    /**
+      * Default Ignite instance.
+      */
+    @transient private var default: Ignite = defaultIgniteContext.ignite
+
+    /**
+      * @param db Ignite instance name.
+      * @return Description of Ignite instance.
+      */
+    override def getDatabase(db: String): CatalogDatabase =
+        CatalogDatabase(db, db, null, Map.empty)
+
+    /**
+      * Checks Ignite instance with provided name exists.
+      * If <code>db == SessionCatalog.DEFAULT_DATABASE</code> checks for a default Ignite instance.
+      *
+      * @param db Ignite instance name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
+      * @return True is Ignite instance exists.
+      */
+    override def databaseExists(db: String): Boolean =
+        db == SessionCatalog.DEFAULT_DATABASE || igniteExists(db)
+
+    /**
+      * @return List of all known Ignite instances names.
+      */
+    override def listDatabases(): Seq[String] =
+        Ignition.allGrids().map(igniteName)
+
+    /**
+      * @param pattern Pattern to filter databases names.
+      * @return List of all known Ignite instances names filtered by pattern.
+      */
+    override def listDatabases(pattern: String): Seq[String] =
+        StringUtils.filterPattern(listDatabases(), pattern)
+
+    /**
+      * Sets default Ignite instance.
+      *
+      * @param db Name of Ignite instance.
+      */
+    override def setCurrentDatabase(db: String): Unit = {
+        ensureIgnite(db)
+
+        default = ignite(db)
+    }
+
+    /** @inheritdoc */
+    override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get
+
+    /** @inheritdoc */
+    override def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
+        val ignite = igniteOrDefault(db, default)
+
+        val gridName = igniteName(ignite)
+
+        igniteSQLTable(ignite, tabName) match {
+            case Some(table) ⇒
+                val tableName = table.getTableName
+
+                Some(new CatalogTable(
+                    identifier = new TableIdentifier(tableName, Some(gridName)),
+                    tableType = CatalogTableType.EXTERNAL,
+                    storage = CatalogStorageFormat(
+                        locationUri = None,
+                        inputFormat = Some(FORMAT_IGNITE),
+                        outputFormat = Some(FORMAT_IGNITE),
+                        serde = None,
+                        compressed = false,
+                        properties = Map(
+                            OPTION_GRID → gridName,
+                            OPTION_TABLE → tableName)
+                    ),
+                    schema = schema(table),
+                    provider = Some(FORMAT_IGNITE),
+                    partitionColumnNames =
+                        if (table.getKeyFields != null)
+                            table.getKeyFields.toSeq
+                        else
+                            Seq(table.getKeyFieldName),
+                    bucketSpec = None))
+            case None ⇒ None
+        }
+    }
+
+    /** @inheritdoc */
+    override def tableExists(db: String, table: String): Boolean =
+        sqlTableExists(igniteOrDefault(db, default), table)
+
+    /** @inheritdoc */
+    override def listTables(db: String): Seq[String] = listTables(db, ".*")
+
+    /** @inheritdoc */
+    override def listTables(db: String, pattern: String): Seq[String] = {
+        val ignite = igniteOrDefault(db, default)
+
+        ignite.cacheNames.flatten { name =>
+            val cache = ignite.cache[Any, Any](name)
+
+            val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]])
+
+            ccfg.getQueryEntities.map(_.getTableName)
+        }.toSeq
+    }
+
+    /** @inheritdoc */
+    override def loadTable(db: String, table: String,
+        loadPath: String, isOverwrite: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ }
+
+    /** @inheritdoc */
+    override def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = null
+
+    /** @inheritdoc */
+    override def getPartitionOption(db: String, table: String,
+        spec: TablePartitionSpec): Option[CatalogTablePartition] = None
+
+    /** @inheritdoc */
+    override def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[String] = {
+        val ignite = igniteOrDefault(db, default)
+
+        sqlCacheName(ignite, table).map { cacheName ⇒
+            val parts = ignite.affinity(cacheName).partitions()
+
+            (0 until parts).map(_.toString)
+        }.getOrElse(Seq.empty)
+    }
+
+    /** @inheritdoc */
+    override def listPartitions(db: String, table: String,
+        partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
+        val ignite = igniteOrDefault(db, default)
+
+        val partitionNames = listPartitionNames(db, table, partialSpec)
+
+        if (partitionNames.isEmpty)
+            Seq.empty
+        else {
+            val cacheName = sqlCacheName(ignite, table).get
+
+            val aff = ignite.affinity[Any](cacheName)
+
+            partitionNames.map { name ⇒
+                val nodes = aff.mapPartitionToPrimaryAndBackups(name.toInt)
+
+                if (nodes.isEmpty)
+                    throw new AnalysisException(s"Nodes for parition is empty [grid=${ignite.name},table=$table,partition=$name].")
+
+                CatalogTablePartition (
+                    Map[String, String] (
+                        "name" → name,
+                        "primary" → nodes.head.id.toString,
+                        "backups" → nodes.tail.map(_.id.toString).mkString(",")
+                    ),
+                    CatalogStorageFormat.empty
+                )
+            }
+        }
+    }
+
+    /** @inheritdoc */
+    override def listPartitionsByFilter(db: String,
+        table: String,
+        predicates: Seq[Expression],
+        defaultTimeZoneId: String): Seq[CatalogTablePartition] =
+        listPartitions(db, table, None)
+
+    /** @inheritdoc */
+    override def loadPartition(db: String,
+        table: String,
+        loadPath: String,
+        partition: TablePartitionSpec, isOverwrite: Boolean,
+        inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ }
+
+    /** @inheritdoc */
+    override def loadDynamicPartitions(db: String, table: String,
+        loadPath: String,
+        partition: TablePartitionSpec, replace: Boolean,
+        numDP: Int): Unit = { /* no-op */ }
+
+    /** @inheritdoc */
+    override def getFunction(db: String, funcName: String): CatalogFunction =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def functionExists(db: String, funcName: String): Boolean = false
+
+    /** @inheritdoc */
+    override def listFunctions(db: String, pattern: String): Seq[String] = Seq.empty[String]
+
+    /** @inheritdoc */
+    override def alterDatabase(dbDefinition: CatalogDatabase): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def alterTable(tableDefinition: CatalogTable): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def alterTableSchema(db: String, table: String, schema: StructType): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit = { /* no-op */ }
+
+    /** @inheritdoc */
+    override protected def doDropFunction(db: String, funcName: String): Unit = { /* no-op */ }
+
+    /** @inheritdoc */
+    override protected def doRenameFunction(db: String, oldName: String, newName: String): Unit = { /* no-op */ }
+
+    /** @inheritdoc */
+    override protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override protected def doDropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override protected def doRenameTable(db: String, oldName: String, newName: String): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def createPartitions(db: String, table: String,
+        parts: Seq[CatalogTablePartition],
+        ignoreIfExists: Boolean): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def dropPartitions(db: String, table: String,
+        parts: Seq[TablePartitionSpec],
+        ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def renamePartitions(db: String, table: String,
+        specs: Seq[TablePartitionSpec],
+        newSpecs: Seq[TablePartitionSpec]): Unit =
+        throw new UnsupportedOperationException("unsupported")
+
+    /** @inheritdoc */
+    override def alterPartitions(db: String, table: String,
+        parts: Seq[CatalogTablePartition]): Unit =
+        throw new UnsupportedOperationException("unsupported")
+}
+
+object IgniteExternalCatalog {
+    /**
+      * Config option to specify named grid instance to connect when loading data.
+      * For internal use only.
+      *
+      * @see [[org.apache.ignite.Ignite#name()]]
+      */
+    private[apache] val OPTION_GRID = "grid"
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala
new file mode 100644
index 0000000..20a6fdf
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogEvent, ExternalCatalogEventListener}
+import org.apache.spark.sql.internal.SharedState
+
+/**
+  * Shared state to override link to IgniteExternalCatalog
+  */
+private[ignite] class IgniteSharedState (
+    igniteContext: IgniteContext,
+    sparkContext: SparkContext) extends SharedState(sparkContext) {
+    /** @inheritdoc */
+    override lazy val externalCatalog: ExternalCatalog = {
+        val externalCatalog = new IgniteExternalCatalog(igniteContext)
+
+        externalCatalog.addListener(new ExternalCatalogEventListener {
+            override def onEvent(event: ExternalCatalogEvent): Unit = {
+                sparkContext.listenerBus.post(event)
+            }
+        })
+
+        externalCatalog
+    }
+}