You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2017/12/29 21:13:36 UTC
[2/2] ignite git commit: IGNITE-3084 - Ignite data source
implementation for Spark data frame API - Fixes #2742
IGNITE-3084 - Ignite data source implementation for Spark data frame API - Fixes #2742
Signed-off-by: Valentin Kulichenko <va...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44428f31
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44428f31
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44428f31
Branch: refs/heads/master
Commit: 44428f31817f1f67823373dacaf178228c02ed6c
Parents: a3b8324
Author: Nikolay Izhikov <ni...@gmail.com>
Authored: Fri Dec 29 13:13:26 2017 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Dec 29 13:13:26 2017 -0800
----------------------------------------------------------------------
.../examples/spark/IgniteCatalogExample.scala | 125 +++++++
.../examples/spark/IgniteDataFrameExample.scala | 146 ++++++++
.../apache/ignite/examples/spark/package.scala | 37 ++
.../spark/examples/IgniteDataFrameSelfTest.java | 44 +++
.../IgniteExamplesSparkSelfTestSuite.java | 2 +
modules/spark-2.10/pom.xml | 205 +++++++++--
modules/spark/pom.xml | 31 +-
....apache.spark.sql.sources.DataSourceRegister | 1 +
.../org/apache/ignite/spark/IgniteContext.scala | 44 ++-
.../ignite/spark/IgniteDataFrameSettings.scala | 56 +++
.../org/apache/ignite/spark/IgniteRDD.scala | 65 ++--
.../spark/impl/IgniteDataFramePartition.scala | 31 ++
.../ignite/spark/impl/IgnitePartition.scala | 2 +-
.../spark/impl/IgniteRelationProvider.scala | 90 +++++
.../spark/impl/IgniteSQLDataFrameRDD.scala | 82 +++++
.../ignite/spark/impl/IgniteSQLRelation.scala | 227 ++++++++++++
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 16 +-
.../org/apache/ignite/spark/impl/package.scala | 132 +++++++
.../sql/ignite/IgniteExternalCatalog.scala | 305 +++++++++++++++++
.../spark/sql/ignite/IgniteSharedState.scala | 43 +++
.../spark/sql/ignite/IgniteSparkSession.scala | 343 +++++++++++++++++++
modules/spark/src/test/resources/cities.json | 3 +
.../src/test/resources/ignite-spark-config.xml | 64 ++++
.../ignite/spark/AbstractDataFrameSpec.scala | 164 +++++++++
.../apache/ignite/spark/IgniteCatalogSpec.scala | 160 +++++++++
.../spark/IgniteDataFrameSchemaSpec.scala | 88 +++++
.../ignite/spark/IgniteDataFrameSuite.scala | 30 ++
.../spark/IgniteDataFrameWrongConfigSpec.scala | 53 +++
.../ignite/spark/IgniteSQLDataFrameSpec.scala | 288 ++++++++++++++++
parent/pom.xml | 7 +-
30 files changed, 2805 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala
new file mode 100644
index 0000000..4382e10
--- /dev/null
+++ b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteCatalogExample.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.spark
+
+import java.lang.{Long ⇒ JLong}
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+
+/**
+ * Example application to show use-case for Ignite implementation of Spark SQL {@link org.apache.spark.sql.catalog.Catalog}.
+ * Catalog provides ability to automatically resolve SQL tables created in Ignite.
+ */
+object IgniteCatalogExample extends App {
+ /**
+ * Ignite config file.
+ */
+ private val CONFIG = "examples/config/example-ignite.xml"
+
+ /**
+ * Test cache name.
+ */
+ private val CACHE_NAME = "testCache"
+
+ //Starting Ignite server node.
+ val ignite = setupServerAndData
+
+ closeAfter(ignite) { ignite ⇒
+ //Creating Ignite-specific implementation of Spark session.
+ val igniteSession = IgniteSparkSession.builder()
+ .appName("Spark Ignite catalog example")
+ .master("local")
+ .config("spark.executor.instances", "2")
+ .igniteConfig(CONFIG)
+ .getOrCreate()
+
+ //Adjust the logger to exclude the logs of no interest.
+ Logger.getRootLogger.setLevel(Level.ERROR)
+ Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+ //Showing existing tables.
+ igniteSession.catalog.listTables().show()
+
+ //Showing `person` schema.
+ igniteSession.catalog.listColumns("person").show()
+
+ //Showing `city` schema.
+ igniteSession.catalog.listColumns("city").show()
+
+ //Selecting data throw Spark SQL engine.
+ val df = igniteSession.sql("SELECT * FROM person WHERE CITY_ID = 2")
+
+ df.printSchema()
+ df.show()
+
+ //Selecting data throw Spark SQL engine.
+ val df2 = igniteSession.sql("SELECT * FROM person p JOIN city c ON c.ID = p.CITY_ID WHERE p.CITY_ID = 2")
+
+ df2.printSchema()
+ df2.show()
+ }
+
+ /**
+ * Starting ignite server node and creating.
+ *
+ * @return Ignite server node.
+ */
+ def setupServerAndData: Ignite = {
+ //Starting Ignite.
+ val ignite = Ignition.start(CONFIG)
+
+ //Creating cache.
+ val ccfg = new CacheConfiguration[Int, Int](CACHE_NAME).setSqlSchema("PUBLIC")
+
+ val cache = ignite.getOrCreateCache(ccfg)
+
+ //Create tables.
+ cache.query(new SqlFieldsQuery(
+ "CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR) WITH \"template=replicated\"")).getAll
+
+ cache.query(new SqlFieldsQuery(
+ "CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) " +
+ "WITH \"backups=1, affinityKey=city_id\"")).getAll
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll
+
+ println("Created database objects.")
+
+ //Inserting some data into table.
+ var qry = new SqlFieldsQuery("INSERT INTO city (id, name) VALUES (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll
+
+ qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 3L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 2L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 1L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 2L.asInstanceOf[JLong])).getAll
+
+ println("Populated data.")
+
+ ignite
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala
new file mode 100644
index 0000000..ae84915
--- /dev/null
+++ b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameExample.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.spark
+
+import java.lang.{Long ⇒ JLong, String ⇒ JString}
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+
+/**
+ * Example application showing use-cases for Ignite implementation of Spark DataFrame API.
+ */
+object IgniteDataFrameExample extends App {
+ /**
+ * Ignite config file.
+ */
+ private val CONFIG = "examples/config/example-ignite.xml"
+
+ /**
+ * Test cache name.
+ */
+ private val CACHE_NAME = "testCache"
+
+ //Starting Ignite server node.
+ val ignite = setupServerAndData
+
+ closeAfter(ignite) { ignite ⇒
+ //Creating spark session.
+ implicit val spark = SparkSession.builder()
+ .appName("Spark Ignite data sources example")
+ .master("local")
+ .config("spark.executor.instances", "2")
+ .getOrCreate()
+
+ //Adjust the logger to exclude the logs of no interest.
+ Logger.getRootLogger.setLevel(Level.ERROR)
+ Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+ //Executing examples.
+
+ sparkDSLExample
+
+ nativeSparkSqlExample
+ }
+
+ /**
+ * Examples of usage Ignite DataFrame implementation.
+ * Selecting data throw Spark DSL.
+ *
+ * @param spark SparkSession.
+ */
+ def sparkDSLExample(implicit spark: SparkSession): Unit = {
+ val igniteDF = spark.read
+ .format(FORMAT_IGNITE) //Data source type.
+ .option(OPTION_TABLE, "person") //Table to read.
+ .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
+ .load()
+ .filter(col("id") >= 2) //Filter clause.
+ .filter(col("name") like "%M%") //Another filter clause.
+
+ igniteDF.printSchema() //Printing query schema to console.
+ igniteDF.show() //Printing query results to console.
+ }
+
+ /**
+ * Examples of usage Ignite DataFrame implementation.
+ * Registration of Ignite DataFrame for following usage.
+ * Selecting data by Spark SQL query.
+ *
+ * @param spark SparkSession.
+ */
+ def nativeSparkSqlExample(implicit spark: SparkSession): Unit = {
+ val df = spark.read
+ .format(FORMAT_IGNITE) //Data source type.
+ .option(OPTION_TABLE, "person") //Table to read.
+ .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
+ .load()
+
+ //Registering DataFrame as Spark view.
+ df.createOrReplaceTempView("person")
+
+ //Selecting data from Ignite throw Spark SQL Engine.
+ val igniteDF = spark.sql("SELECT * FROM person WHERE id >= 2 AND name = 'Mary Major'")
+
+ igniteDF.printSchema() //Printing query schema to console.
+ igniteDF.show() //Printing query results to console.
+ }
+
+ def setupServerAndData: Ignite = {
+ //Starting Ignite.
+ val ignite = Ignition.start(CONFIG)
+
+ //Creating first test cache.
+ val ccfg = new CacheConfiguration[JLong, JString](CACHE_NAME).setSqlSchema("PUBLIC")
+
+ val cache = ignite.getOrCreateCache(ccfg)
+
+ //Creating SQL tables.
+ cache.query(new SqlFieldsQuery(
+ "CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR) WITH \"template=replicated\"")).getAll
+
+ cache.query(new SqlFieldsQuery(
+ "CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) " +
+ "WITH \"backups=1, affinityKey=city_id\"")).getAll
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll
+
+ //Inserting some data to tables.
+ var qry = new SqlFieldsQuery("INSERT INTO city (id, name) VALUES (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll
+
+ qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 3L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 2L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 1L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 2L.asInstanceOf[JLong])).getAll
+
+ println("Populated data.")
+
+ ignite
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala
new file mode 100644
index 0000000..a877d82
--- /dev/null
+++ b/examples/src/main/spark/org/apache/ignite/examples/spark/package.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.examples
+
+/**
+ */
+package object spark {
+ /**
+ * Utility object.
+ * Takes a `AutoCloseable` resource and closure to work with it.
+ * After work is done - closes the resource.
+ */
+ object closeAfter {
+ def apply[R <: AutoCloseable, T](r: R)(c: (R) ⇒ T) = {
+ try {
+ c(r)
+ }
+ finally {
+ r.close
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
new file mode 100644
index 0000000..b18d870
--- /dev/null
+++ b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples;
+
+import org.apache.ignite.examples.spark.IgniteCatalogExample;
+import org.apache.ignite.examples.spark.IgniteDataFrameExample;
+import org.junit.Test;
+
+/**
+ */
+public class IgniteDataFrameSelfTest {
+ static final String[] EMPTY_ARGS = new String[0];
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCatalogExample() throws Exception {
+ IgniteCatalogExample.main(EMPTY_ARGS);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDataFrameExample() throws Exception {
+ IgniteDataFrameExample.main(EMPTY_ARGS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java b/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
index 73b286a..9e1b2df 100644
--- a/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
+++ b/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spark.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.spark.examples.IgniteDataFrameSelfTest;
import org.apache.ignite.spark.examples.SharedRDDExampleSelfTest;
import org.apache.ignite.testframework.GridTestUtils;
@@ -40,6 +41,7 @@ public class IgniteExamplesSparkSelfTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Examples Test Suite");
suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));
+ suite.addTest(new TestSuite(IgniteDataFrameSelfTest.class));
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark-2.10/pom.xml b/modules/spark-2.10/pom.xml
index 326c8e1..b973afb 100644
--- a/modules/spark-2.10/pom.xml
+++ b/modules/spark-2.10/pom.xml
@@ -56,6 +56,12 @@
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala210.library.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
@@ -63,7 +69,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-unsafe_2.10</artifactId>
+ <artifactId>spark-catalyst_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
@@ -74,9 +80,9 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>${jackson2.version}</version>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-common_2.10</artifactId>
+ <version>${spark.version}</version>
</dependency>
<dependency>
@@ -86,73 +92,212 @@
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${spark.hadoop.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson2.version}</version>
</dependency>
<dependency>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-core_2.10</artifactId>
+ <version>3.5.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.10</artifactId>
+ <version>2.2.6</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-tags_2.10</artifactId>
+ <artifactId>spark-unsafe_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-catalyst_2.10</artifactId>
+ <artifactId>spark-launcher_2.10</artifactId>
<version>${spark.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.29.Final</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.20</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.10</artifactId>
+ <version>0.8.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ <version>3.1.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tomcat</groupId>
+ <artifactId>tomcat-servlet-api</artifactId>
+ <version>8.0.23</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>2.25</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson2.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_2.10</artifactId>
+ <version>${jackson2.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ <version>4.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ <version>1.3.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-core_2.11</artifactId>
- <version>3.5.0</version>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
</dependency>
- <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.10</artifactId>
- <version>2.2.2</version>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ <version>4.5.3</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-indexing</artifactId>
- <version>${project.version}</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>1.7.16</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava14.version}</version>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>../spark/src/main/scala</sourceDirectory>
+ <testSourceDirectory>../spark/src/test/java</testSourceDirectory>
<resources>
<resource>
- <directory>../spark/src/main/scala</directory>
- <excludes>
- <exclude>**/*.scala</exclude>
- </excludes>
+ <directory>../spark/src/main/resources</directory>
</resource>
</resources>
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 3cd2e1c..458abd8 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -52,7 +52,13 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.11.8</version>
+ <version>${scala211.library.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala211.library.version}</version>
</dependency>
<dependency>
@@ -88,7 +94,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
+ <version>${spark.hadoop.version}</version>
</dependency>
<dependency>
@@ -156,12 +162,6 @@
</dependency>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-unsafe_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
@@ -265,12 +265,27 @@
<version>3.0.0</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ <version>4.5.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>1.7.16</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..8304662
--- /dev/null
+++ b/modules/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.ignite.spark.impl.IgniteRelationProvider
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index edbf2be..ba1b244 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -21,9 +21,11 @@ import org.apache.ignite._
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.internal.util.IgniteUtils
+import org.apache.ignite.spark.IgniteContext.setIgniteHome
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
/**
* Ignite context.
@@ -60,6 +62,14 @@ class IgniteContext(
// Make sure to start Ignite on context creation.
ignite()
+ //Stop local ignite instance on application end.
+ //Instances on workers will be stopped with executor stop(jvm exit).
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ close()
+ }
+ })
+
/**
* Creates an instance of IgniteContext with the given spring configuration.
*
@@ -129,15 +139,7 @@ class IgniteContext(
* @return
*/
def ignite(): Ignite = {
- val home = IgniteUtils.getIgniteHome
-
- if (home == null && igniteHome != null) {
- Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
-
- IgniteUtils.nullifyHomeDirectory()
-
- System.setProperty(IgniteSystemProperties.IGNITE_HOME, igniteHome)
- }
+ setIgniteHome(igniteHome)
val igniteCfg = cfgClo()
@@ -159,7 +161,7 @@ class IgniteContext(
* Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be
* a no-op.
*/
- def close(shutdownIgniteOnWorkers: Boolean = false) = {
+ def close(shutdownIgniteOnWorkers: Boolean = false): Unit = {
// additional check if called from driver
if (sparkContext != null && shutdownIgniteOnWorkers) {
// Get required number of executors with default equals to number of available executors.
@@ -180,7 +182,25 @@ class IgniteContext(
private def doClose() = {
val igniteCfg = cfgClo()
- Ignition.stop(igniteCfg.getIgniteInstanceName, false)
+ if (Ignition.state(igniteCfg.getIgniteInstanceName) == IgniteState.STARTED)
+ Ignition.stop(igniteCfg.getIgniteInstanceName, false)
+ }
+}
+
+object IgniteContext {
+ def apply(sparkContext: SparkContext, cfgF: () ⇒ IgniteConfiguration, standalone: Boolean = true): IgniteContext =
+ new IgniteContext(sparkContext, cfgF, standalone)
+
+ def setIgniteHome(igniteHome: String): Unit = {
+ val home = IgniteUtils.getIgniteHome
+
+ if (home == null && igniteHome != null) {
+ Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
+
+ IgniteUtils.nullifyHomeDirectory()
+
+ System.setProperty(IgniteSystemProperties.IGNITE_HOME, igniteHome)
+ }
}
}
@@ -189,7 +209,7 @@ class IgniteContext(
*
* @param clo Closure to wrap.
*/
-private class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
+class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
@transient @volatile var res: IgniteConfiguration = null
def apply(): IgniteConfiguration = {
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
new file mode 100644
index 0000000..4261032
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+/**
+ */
+object IgniteDataFrameSettings {
+ /**
+ * Name of DataSource format for loading data from Apache Ignite.
+ */
+ val FORMAT_IGNITE = "ignite"
+
+ /**
+ * Config option to specify path to ignite config file.
+ * Config from this file will be used to connect to existing Ignite cluster.
+ *
+ * @note All nodes for executing Spark task forcibly will be started in client mode.
+ *
+ * @example {{{
+ * val igniteDF = spark.read.format(IGNITE)
+ * .option(OPTION_CONFIG_FILE, CONFIG_FILE)
+ * // other options ...
+ * .load()
+ * }}}
+ */
+ val OPTION_CONFIG_FILE = "config"
+
+ /**
+ * Config option to specify Ignite SQL table name to load data from.
+ *
+ * @example {{{
+ * val igniteDF = spark.read.format(IGNITE)
+ * // other options ...
+ * .option(OPTION_TABLE, "mytable")
+ * .load()
+ * }}}
+ *
+ * @see [[org.apache.ignite.cache.QueryEntity#tableName]]
+ */
+ val OPTION_TABLE = "table"
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 78e2223..fce47a6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -26,10 +26,10 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
import org.apache.ignite.lang.IgniteUuid
import org.apache.ignite.spark.impl._
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
+import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
import org.apache.spark.sql._
-import org.apache.spark._
+import org.apache.spark.sql.types._
import scala.collection.JavaConversions._
@@ -314,17 +314,35 @@ class IgniteRDD[K, V] (
* @return Spark schema.
*/
private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = {
- new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true))
+ new StructType(fieldsMeta.map(i ⇒
+ new StructField(i.fieldName(), IgniteRDD.dataType(i.fieldTypeName(), i.fieldName()), nullable = true))
.toArray)
}
/**
- * Gets Spark data type based on type name.
+ * Generates affinity key for given cluster node.
*
- * @param typeName Type name.
- * @return Spark data type.
+ * @param value Value to generate key for.
+ * @param node Node to generate key for.
+ * @return Affinity key.
*/
- private def dataType(typeName: String): DataType = typeName match {
+ private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
+ val aff = ic.ignite().affinity[IgniteUuid](cacheName)
+
+ Stream.from(1, Math.max(1000, aff.partitions() * 2))
+ .map(_ ⇒ IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node))
+ .getOrElse(IgniteUuid.randomUuid())
+ }
+}
+
+object IgniteRDD {
+ /**
+ * Gets Spark data type based on type name.
+ *
+ * @param typeName Type name.
+ * @return Spark data type.
+ */
+ def dataType(typeName: String, fieldName: String): DataType = typeName match {
case "java.lang.Boolean" ⇒ BooleanType
case "java.lang.Byte" ⇒ ByteType
case "java.lang.Short" ⇒ ShortType
@@ -343,17 +361,24 @@ class IgniteRDD[K, V] (
}
/**
- * Generates affinity key for given cluster node.
- *
- * @param value Value to generate key for.
- * @param node Node to generate key for.
- * @return Affinity key.
- */
- private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
- val aff = ic.ignite().affinity[IgniteUuid](cacheName)
-
- Stream.from(1, Math.max(1000, aff.partitions() * 2))
- .map(_ ⇒ IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node))
- .getOrElse(IgniteUuid.randomUuid())
- }
+ * Converts java.util.Date to java.sql.Date as j.u.Date not supported by Spark SQL.
+ *
+ * @param input Any value.
+ * @return If input is java.util.Date returns java.sql.Date representation of given value, otherwise returns unchanged value.
+ */
+ def convertIfNeeded(input: Any): Any =
+ if (input == null)
+ input
+ else {
+ input match {
+ case timestamp: java.sql.Timestamp ⇒
+ timestamp
+
+ //Spark SQL doesn't support java.util.Date see - https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
+ case date: java.util.Date ⇒
+ new java.sql.Date(date.getTime)
+
+ case _ ⇒ input
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala
new file mode 100644
index 0000000..4c9c72e
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteDataFramePartition.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.spark.Partition
+
+/**
+ * DataFrame partition
+ *
+ * sparkPartitionIdx - index of partition
+ * primary - primary node for list of ignitePartitions
+ */
+case class IgniteDataFramePartition(sparkPartIdx: Int, primary: ClusterNode, igniteParts: List[Int]) extends Partition {
+ override def index: Int = sparkPartIdx
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
index 2def636..2107a5f 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
@@ -19,6 +19,6 @@ package org.apache.ignite.spark.impl
import org.apache.spark.Partition
-class IgnitePartition(idx: Int) extends Partition {
+case class IgnitePartition(idx: Int) extends Partition {
override def index: Int = idx
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
new file mode 100644
index 0000000..4762d8d
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.configuration.IgniteConfiguration
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.util.IgniteUtils
+import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+import org.apache.ignite.spark.IgniteContext
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.IgniteException
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID
+import org.apache.spark.sql.sources._
+
+/**
+ * Apache Ignite relation provider.
+ */
+class IgniteRelationProvider extends RelationProvider with DataSourceRegister {
+ /**
+ * @return "ignite" - name of relation provider.
+ */
+ override def shortName(): String = FORMAT_IGNITE
+
+ /**
+ * To create IgniteRelation we need a link to a ignite cluster and a table name.
+ * To refer cluster user have to specify one of config parameter:
+ * <ul>
+ * <li><code>config</code> - path to ignite configuration file.
+ * <li><code>grid</code> - grid name. Note that grid has to be started in the same jvm.
+ * <ul>
+ * Existing table inside Apache Ignite should be referred via <code>table</code> parameter.
+ *
+ * @param sqlCtx SQLContext.
+ * @param params Parameters for relation creation.
+ * @return IgniteRelation.
+ * @see IgniteRelation
+ * @see IgnitionEx#grid(String)
+ */
+ override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation = {
+ val igniteHome = IgniteUtils.getIgniteHome
+
+ def configProvider: () ⇒ IgniteConfiguration = {
+ if (params.contains(OPTION_CONFIG_FILE))
+ () ⇒ {
+ IgniteContext.setIgniteHome(igniteHome)
+
+ val cfg = IgnitionEx.loadConfiguration(params(OPTION_CONFIG_FILE)).get1()
+
+ cfg.setClientMode(true)
+
+ cfg
+ }
+ else if (params.contains(OPTION_GRID))
+ () ⇒ {
+ IgniteContext.setIgniteHome(igniteHome)
+
+ val cfg = ignite(params(OPTION_GRID)).configuration()
+
+ cfg.setClientMode(true)
+
+ cfg
+ }
+ else
+ throw new IgniteException("'config' must be specified to connect to ignite cluster.")
+ }
+
+ val ic = IgniteContext(sqlCtx.sparkContext, configProvider)
+
+ if (params.contains(OPTION_TABLE))
+ IgniteSQLRelation(ic, params(OPTION_TABLE).toUpperCase, sqlCtx)
+ else
+ throw new IgniteException("'table' must be specified for loading ignite data.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
new file mode 100644
index 0000000..93ef529
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.CacheMode
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.{Partition, TaskContext}
+import java.util.{List ⇒ JList}
+
+/**
+ * Implementation of Spark RDD for Apache Ignite to support Data Frame API.
+ */
+class IgniteSQLDataFrameRDD[K, V](
+ ic: IgniteContext,
+ cacheName: String,
+ schema: StructType,
+ qryStr: String,
+ args: List[_],
+ parts: Array[Partition]) extends
+ IgniteSqlRDD[Row, JList[_], K, V](
+ ic,
+ cacheName,
+ cacheCfg = null,
+ qry = null,
+ r ⇒ new GenericRowWithSchema(r.toArray.map(IgniteRDD.convertIfNeeded), schema),
+ keepBinary = true,
+ parts) {
+
+ /**
+ * Executes an Ignite query for this RDD and return Iterator to iterate throw results.
+ *
+ * @param partition Partition.
+ * @param context TaskContext.
+ * @return Results of query for specific partition.
+ */
+ override def compute(partition: Partition, context: TaskContext): Iterator[Row] = {
+ val qry0 = new SqlFieldsQuery(qryStr)
+
+ if (args.nonEmpty)
+ qry0.setArgs(args.map(_.asInstanceOf[Object]): _*)
+
+ val ccfg = ic.ignite().cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
+
+ if (ccfg.getCacheMode != CacheMode.REPLICATED)
+ qry0.setPartitions(partition.asInstanceOf[IgniteDataFramePartition].igniteParts: _*)
+
+ qry = qry0
+
+ super.compute(partition, context)
+ }
+}
+
+object IgniteSQLDataFrameRDD {
+ def apply[K, V](ic: IgniteContext,
+ cacheName: String,
+ schema: StructType,
+ qryStr: String,
+ args: List[_],
+ parts: Array[Partition] = Array(IgnitePartition(0))) = {
+ new IgniteSQLDataFrameRDD(ic, cacheName, schema, qryStr, args, parts)
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
new file mode 100644
index 0000000..bdcf57b
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.{Ignite, IgniteException}
+import org.apache.ignite.cache.{CacheMode, QueryEntity}
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.spark.Partition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SQLContext}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Apache Ignite implementation of Spark BaseRelation with PrunedFilteredScan for Ignite SQL Tables
+ */
+class IgniteSQLRelation[K, V](
+ private[spark] val ic: IgniteContext,
+ private[spark] val tableName: String)
+ (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan {
+
+ /**
+ * @return Schema of Ignite SQL table.
+ */
+ override def schema: StructType =
+ igniteSQLTable(ic.ignite(), tableName)
+ .map(IgniteSQLRelation.schema)
+ .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
+
+ /**
+ * Builds Apache Ignite SQL Query for given table, columns and filters.
+ *
+ * @param columns Columns to select.
+ * @param filters Filters to apply.
+ * @return Apache Ignite RDD implementation.
+ */
+ override def buildScan(columns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ val columnsStr =
+ if (columns.isEmpty)
+ "*"
+ else
+ columns.mkString(",")
+
+ //Creating corresponding Ignite SQL query.
+ //Query will be executed by Ignite SQL Engine.
+ val qryAndArgs = filters match {
+ case Array(_, _*) ⇒
+ val where = compileWhere(filters)
+ (s"SELECT $columnsStr FROM $tableName WHERE ${where._1}", where._2)
+ case _ ⇒
+ (s"SELECT $columnsStr FROM $tableName", List.empty)
+ }
+
+ IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryAndArgs._1, qryAndArgs._2, calcPartitions(filters))
+ }
+
+ override def toString = s"IgniteSQLRelation[table=$tableName]"
+
+ /**
+ * Builds `where` part of SQL query.
+ *
+ * @param filters Filter to apply.
+ * @return Tuple contains `where` string and `List[Any]` of query parameters.
+ */
+ private def compileWhere(filters: Array[Filter]): (String, List[Any]) =
+ filters.foldLeft(("", List[Any]()))(buildSingleClause)
+
+ /**
+ * Adds single where clause to `state` and returns new state.
+ *
+ * @param state Current `where` state.
+ * @param clause Clause to add.
+ * @return `where` with given clause.
+ */
+ private def buildSingleClause(state: (String, List[Any]), clause: Filter): (String, List[Any]) = {
+ val filterStr = state._1
+ val params = state._2
+
+ clause match {
+ case EqualTo(attr, value) ⇒ (addStrClause(filterStr, s"$attr = ?"), params :+ value)
+
+ case EqualNullSafe(attr, value) ⇒ (addStrClause(filterStr, s"($attr IS NULL OR $attr = ?)"), params :+ value)
+
+ case GreaterThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr > ?"), params :+ value)
+
+ case GreaterThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, s"$attr >= ?"), params :+ value)
+
+ case LessThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr < ?"), params :+ value)
+
+ case LessThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, s"$attr <= ?"), params :+ value)
+
+ case In(attr, values) ⇒ (addStrClause(filterStr, s"$attr IN (${values.map(_ ⇒ "?").mkString(",")})"), params ++ values)
+
+ case IsNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NULL"), params)
+
+ case IsNotNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NOT NULL"), params)
+
+ case And(left, right) ⇒
+ val leftClause = buildSingleClause(("", params), left)
+ val rightClause = buildSingleClause(("", leftClause._2), right)
+
+ (addStrClause(filterStr, s"${leftClause._1} AND ${rightClause._1}"), rightClause._2)
+
+ case Or(left, right) ⇒
+ val leftClause = buildSingleClause(("", params), left)
+ val rightClause = buildSingleClause(("", leftClause._2), right)
+
+ (addStrClause(filterStr, s"${leftClause._1} OR ${rightClause._1}"), rightClause._2)
+
+ case Not(child) ⇒
+ val innerClause = buildSingleClause(("", params), child)
+
+ (addStrClause(filterStr, s"NOT ${innerClause._1}"), innerClause._2)
+
+ case StringStartsWith(attr, value) ⇒
+ (addStrClause(filterStr, s"$attr LIKE ?"), params :+ (value + "%"))
+
+ case StringEndsWith(attr, value) ⇒
+ (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value))
+
+ case StringContains(attr, value) ⇒
+ (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value + "%"))
+ }
+ }
+
+ private def calcPartitions(filters: Array[Filter]): Array[Partition] = {
+ val cache = ic.ignite().cache[K, V](cacheName)
+
+ val ccfg = cache.getConfiguration(classOf[CacheConfiguration[K, V]])
+
+ if (ccfg.getCacheMode == CacheMode.REPLICATED) {
+ val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes()
+
+ Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList))
+ }
+ else {
+ val aff = ic.ignite().affinity(cacheName)
+
+ val parts = aff.partitions()
+
+ val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) {
+ case (nodeToParts, ignitePartIdx) ⇒
+ val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head
+
+ if (nodeToParts.contains(primary)) {
+ nodeToParts(primary) += ignitePartIdx
+
+ nodeToParts
+ }
+ else
+ nodeToParts + (primary → ArrayBuffer[Int](ignitePartIdx))
+ }
+
+ val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) ⇒
+ IgniteDataFramePartition(i, node, nodesParts.toList)
+ }
+
+ partitions.toArray
+ }
+ }
+
+ /**
+ * Cache name for a table name.
+ */
+ private lazy val cacheName: String =
+ sqlCacheName(ic.ignite(), tableName)
+ .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
+
+ /**
+ * Utility method to add clause to sql WHERE string.
+ *
+ * @param filterStr Current filter string
+ * @param clause Clause to add.
+ * @return Filter string.
+ */
+ private def addStrClause(filterStr: String, clause: String) =
+ if (filterStr.isEmpty)
+ clause
+ else
+ filterStr + " AND " + clause
+
+}
+
+object IgniteSQLRelation {
+ /**
+ * Converts Apache Ignite table description: <code>QueryEntity</code> to Spark description: <code>StructType</code>.
+ *
+ * @param table Ignite table descirption.
+ * @return Spark table descirption
+ */
+ def schema(table: QueryEntity): StructType = {
+ //Partition columns has to be in the end of list.
+ //See `org.apache.spark.sql.catalyst.catalog.CatalogTable#partitionSchema`
+ val columns = table.getFields.toList.sortBy(c ⇒ isKeyColumn(table, c._1))
+
+ StructType(columns.map { case (name, dataType) ⇒
+ StructField(
+ name = name,
+ dataType = IgniteRDD.dataType(dataType, name),
+ nullable = !isKeyColumn(table, name),
+ metadata = Metadata.empty)
+ })
+ }
+
+ def apply[K, V](ic: IgniteContext, tableName: String, sqlContext: SQLContext): IgniteSQLRelation[K, V] =
+ new IgniteSQLRelation[K, V](ic,tableName)(sqlContext)
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index f386f26..c843d61 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -28,9 +28,10 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
ic: IgniteContext,
cacheName: String,
cacheCfg: CacheConfiguration[K, V],
- qry: Query[T],
+ var qry: Query[T],
conv: (T) ⇒ R,
- keepBinary: Boolean
+ keepBinary: Boolean,
+ partitions: Array[Partition] = Array(IgnitePartition(0))
) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
override def compute(split: Partition, context: TaskContext): Iterator[R] = {
val cur = ensureCache().query(qry)
@@ -40,7 +41,12 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
new IgniteQueryIterator[T, R](cur.iterator(), conv)
}
- override protected def getPartitions: Array[Partition] = {
- Array(new IgnitePartition(0))
- }
+ override protected def getPartitions: Array[Partition] = partitions
+}
+
+object IgniteSqlRDD {
+ def apply[R: ClassTag, T, K, V](ic: IgniteContext, cacheName: String, cacheCfg: CacheConfiguration[K, V],
+ qry: Query[T], conv: (T) ⇒ R, keepBinary: Boolean,
+ partitions: Array[Partition] = Array(IgnitePartition(0))): IgniteSqlRDD[R, T, K, V] =
+ new IgniteSqlRDD(ic, cacheName, cacheCfg, qry, conv, keepBinary, partitions)
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
new file mode 100644
index 0000000..815854c
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.commons.lang.StringUtils.equalsIgnoreCase
+import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition}
+import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.util.lang.GridFunc.contains
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+
+import scala.collection.JavaConversions._
+
+package object impl {
+ /**
+ * Checks named instance of Ignite exists.
+ * Throws IgniteException if not.
+ *
+ * @param gridName Name of grid.
+ */
+ def ensureIgnite(gridName: String): Unit =
+ if (!igniteExists(gridName))
+ throw new IgniteException(s"Ignite grid with name '$gridName' does not exist.")
+
+ /**
+ * @param gridName Name of grid.
+ * @return True if named instance of Ignite exists false otherwise.
+ */
+ def igniteExists(gridName: String): Boolean =
+ if (gridName == "")
+ Ignition.state() == IgniteState.STARTED
+ else
+ Ignition.state(gridName) == IgniteState.STARTED
+
+ /**
+ * @param g Ignite.
+ * @return Name of Ignite. If name is null empty string returned.
+ */
+ def igniteName(g: Ignite): String =
+ if(g.name() != null)
+ g.name
+ else
+ ""
+
+ /**
+ * @param name Name of grid..
+ * @param default Default instance.
+ * @return Named grid instance if it exists. If not default instance returned.
+ */
+ def igniteOrDefault(name: String, default: Ignite): Ignite =
+ if (name == SessionCatalog.DEFAULT_DATABASE) {
+ if (igniteExists(name))
+ ignite(name)
+ else
+ default
+ }
+ else
+ ignite(name)
+
+ /**
+ * @param gridName Name of grid.
+ * @return Named instance of grid. If 'gridName' is empty unnamed instance returned.
+ */
+ def ignite(gridName: String): Ignite =
+ if (gridName == "")
+ Ignition.ignite()
+ else
+ Ignition.ignite(gridName)
+
+ /**
+ * @param ignite Ignite instance.
+ * @param tabName Table name.
+ * @return True if table exists false otherwise.
+ */
+ def sqlTableExists(ignite: Ignite, tabName: String): Boolean =
+ sqlTableInfo(ignite, tabName).isDefined
+
+ /**
+ * @param ignite Ignite instance.
+ * @param tabName Table name.
+ * @return QueryEntity for a given table.
+ */
+ def igniteSQLTable(ignite: Ignite, tabName: String): Option[QueryEntity] =
+ sqlTableInfo[Any, Any](ignite, tabName).map(_._2)
+
+ /**
+ * @param ignite Ignite instance.
+ * @param tabName Table name.
+ * @return Cache name for given table.
+ */
+ def sqlCacheName(ignite: Ignite, tabName: String): Option[String] =
+ sqlTableInfo[Any, Any](ignite, tabName).map(_._1.getName)
+
+ /**
+ * @param ignite Ignite instance.
+ * @param tabName Table name.
+ * @tparam K Key class.
+ * @tparam V Value class.
+ * @return CacheConfiguration and QueryEntity for a given table.
+ */
+ private def sqlTableInfo[K, V](ignite: Ignite, tabName: String): Option[(CacheConfiguration[K, V], QueryEntity)] =
+ ignite.cacheNames().map { cacheName ⇒
+ val ccfg = ignite.cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
+
+ val queryEntities = ccfg.getQueryEntities
+
+ queryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe))
+ }.find(_.isDefined).flatten
+
+ /**
+ * @param table Table.
+ * @param column Column name.
+ * @return `True` if column is key.
+ */
+ def isKeyColumn(table: QueryEntity, column: String): Boolean =
+ contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column)
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
new file mode 100644
index 0000000..9c1fe0c
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+import org.apache.ignite.spark.IgniteContext
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.spark.impl.IgniteSQLRelation.schema
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.StructType
+import org.apache.ignite.spark.impl._
+import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID
+
+import scala.collection.JavaConversions._
+
+/**
+ * External catalog implementation to provide transparent access to SQL tables existed in Ignite.
+ *
+ * @param defaultIgniteContext Ignite context to provide access to Ignite instance. If <code>None</code> passed then no-name instance of Ignite used.
+ */
+private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) extends ExternalCatalog {
+ /**
+ * Default Ignite instance.
+ */
+ @transient private var default: Ignite = defaultIgniteContext.ignite
+
+ /**
+ * @param db Ignite instance name.
+ * @return Description of Ignite instance.
+ */
+ override def getDatabase(db: String): CatalogDatabase =
+ CatalogDatabase(db, db, null, Map.empty)
+
+ /**
+ * Checks Ignite instance with provided name exists.
+ * If <code>db == SessionCatalog.DEFAULT_DATABASE</code> checks for a default Ignite instance.
+ *
+ * @param db Ignite instance name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
+ * @return True is Ignite instance exists.
+ */
+ override def databaseExists(db: String): Boolean =
+ db == SessionCatalog.DEFAULT_DATABASE || igniteExists(db)
+
+ /**
+ * @return List of all known Ignite instances names.
+ */
+ override def listDatabases(): Seq[String] =
+ Ignition.allGrids().map(igniteName)
+
+ /**
+ * @param pattern Pattern to filter databases names.
+ * @return List of all known Ignite instances names filtered by pattern.
+ */
+ override def listDatabases(pattern: String): Seq[String] =
+ StringUtils.filterPattern(listDatabases(), pattern)
+
+ /**
+ * Sets default Ignite instance.
+ *
+ * @param db Name of Ignite instance.
+ */
+ override def setCurrentDatabase(db: String): Unit = {
+ ensureIgnite(db)
+
+ default = ignite(db)
+ }
+
+ /** @inheritdoc */
+ override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get
+
+ /** @inheritdoc */
+ override def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
+ val ignite = igniteOrDefault(db, default)
+
+ val gridName = igniteName(ignite)
+
+ igniteSQLTable(ignite, tabName) match {
+ case Some(table) ⇒
+ val tableName = table.getTableName
+
+ Some(new CatalogTable(
+ identifier = new TableIdentifier(tableName, Some(gridName)),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(FORMAT_IGNITE),
+ outputFormat = Some(FORMAT_IGNITE),
+ serde = None,
+ compressed = false,
+ properties = Map(
+ OPTION_GRID → gridName,
+ OPTION_TABLE → tableName)
+ ),
+ schema = schema(table),
+ provider = Some(FORMAT_IGNITE),
+ partitionColumnNames =
+ if (table.getKeyFields != null)
+ table.getKeyFields.toSeq
+ else
+ Seq(table.getKeyFieldName),
+ bucketSpec = None))
+ case None ⇒ None
+ }
+ }
+
+ /** @inheritdoc */
+ override def tableExists(db: String, table: String): Boolean =
+ sqlTableExists(igniteOrDefault(db, default), table)
+
+ /** @inheritdoc */
+ override def listTables(db: String): Seq[String] = listTables(db, ".*")
+
+ /** @inheritdoc */
+ override def listTables(db: String, pattern: String): Seq[String] = {
+ val ignite = igniteOrDefault(db, default)
+
+ ignite.cacheNames.flatten { name =>
+ val cache = ignite.cache[Any, Any](name)
+
+ val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]])
+
+ ccfg.getQueryEntities.map(_.getTableName)
+ }.toSeq
+ }
+
+ /** @inheritdoc */
+ override def loadTable(db: String, table: String,
+ loadPath: String, isOverwrite: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ }
+
+ /** @inheritdoc */
+ override def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = null
+
+ /** @inheritdoc */
+ override def getPartitionOption(db: String, table: String,
+ spec: TablePartitionSpec): Option[CatalogTablePartition] = None
+
+ /** @inheritdoc */
+ override def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[String] = {
+ val ignite = igniteOrDefault(db, default)
+
+ sqlCacheName(ignite, table).map { cacheName ⇒
+ val parts = ignite.affinity(cacheName).partitions()
+
+ (0 until parts).map(_.toString)
+ }.getOrElse(Seq.empty)
+ }
+
+ /** @inheritdoc */
+ override def listPartitions(db: String, table: String,
+ partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
+ val ignite = igniteOrDefault(db, default)
+
+ val partitionNames = listPartitionNames(db, table, partialSpec)
+
+ if (partitionNames.isEmpty)
+ Seq.empty
+ else {
+ val cacheName = sqlCacheName(ignite, table).get
+
+ val aff = ignite.affinity[Any](cacheName)
+
+ partitionNames.map { name ⇒
+ val nodes = aff.mapPartitionToPrimaryAndBackups(name.toInt)
+
+ if (nodes.isEmpty)
+ throw new AnalysisException(s"Nodes for parition is empty [grid=${ignite.name},table=$table,partition=$name].")
+
+ CatalogTablePartition (
+ Map[String, String] (
+ "name" → name,
+ "primary" → nodes.head.id.toString,
+ "backups" → nodes.tail.map(_.id.toString).mkString(",")
+ ),
+ CatalogStorageFormat.empty
+ )
+ }
+ }
+ }
+
+ /** @inheritdoc */
+ override def listPartitionsByFilter(db: String,
+ table: String,
+ predicates: Seq[Expression],
+ defaultTimeZoneId: String): Seq[CatalogTablePartition] =
+ listPartitions(db, table, None)
+
+ /** @inheritdoc */
+ override def loadPartition(db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec, isOverwrite: Boolean,
+ inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = { /* no-op */ }
+
+ /** @inheritdoc */
+ override def loadDynamicPartitions(db: String, table: String,
+ loadPath: String,
+ partition: TablePartitionSpec, replace: Boolean,
+ numDP: Int): Unit = { /* no-op */ }
+
+ /** @inheritdoc */
+ override def getFunction(db: String, funcName: String): CatalogFunction =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def functionExists(db: String, funcName: String): Boolean = false
+
+ /** @inheritdoc */
+ override def listFunctions(db: String, pattern: String): Seq[String] = Seq.empty[String]
+
+ /** @inheritdoc */
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def alterTable(tableDefinition: CatalogTable): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def alterTableSchema(db: String, table: String, schema: StructType): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit = { /* no-op */ }
+
+ /** @inheritdoc */
+ override protected def doDropFunction(db: String, funcName: String): Unit = { /* no-op */ }
+
+ /** @inheritdoc */
+ override protected def doRenameFunction(db: String, oldName: String, newName: String): Unit = { /* no-op */ }
+
+ /** @inheritdoc */
+ override protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override protected def doDropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override protected def doRenameTable(db: String, oldName: String, newName: String): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def createPartitions(db: String, table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def dropPartitions(db: String, table: String,
+ parts: Seq[TablePartitionSpec],
+ ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def renamePartitions(db: String, table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit =
+ throw new UnsupportedOperationException("unsupported")
+
+ /** @inheritdoc */
+ override def alterPartitions(db: String, table: String,
+ parts: Seq[CatalogTablePartition]): Unit =
+ throw new UnsupportedOperationException("unsupported")
+}
+
+object IgniteExternalCatalog {
+ /**
+ * Config option to specify named grid instance to connect when loading data.
+ * For internal use only.
+ *
+ * @see [[org.apache.ignite.Ignite#name()]]
+ */
+ private[apache] val OPTION_GRID = "grid"
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44428f31/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala
new file mode 100644
index 0000000..20a6fdf
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSharedState.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogEvent, ExternalCatalogEventListener}
+import org.apache.spark.sql.internal.SharedState
+
+/**
+ * Shared state to override link to IgniteExternalCatalog
+ */
+private[ignite] class IgniteSharedState (
+ igniteContext: IgniteContext,
+ sparkContext: SparkContext) extends SharedState(sparkContext) {
+ /** @inheritdoc */
+ override lazy val externalCatalog: ExternalCatalog = {
+ val externalCatalog = new IgniteExternalCatalog(igniteContext)
+
+ externalCatalog.addListener(new ExternalCatalogEventListener {
+ override def onEvent(event: ExternalCatalogEvent): Unit = {
+ sparkContext.listenerBus.post(event)
+ }
+ })
+
+ externalCatalog
+ }
+}