You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/01 02:37:33 UTC
[14/50] [abbrv] ignite git commit: IGNITE-2941: Add getOrStart method
to ignition IGNITE-2942: Use getOrStart in IgniteContext instead of current
try-catch structure This closes #631 Reviewed by Denis Magda,
Alexey Goncharuk.
IGNITE-2941: Add getOrStart method to ignition
IGNITE-2942: Use getOrStart in IgniteContext instead of current try-catch structure
This closes #631
Reviewed by Denis Magda, Alexey Goncharuk.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/efe76f18
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/efe76f18
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/efe76f18
Branch: refs/heads/ignite-2788
Commit: efe76f18db8f30492046e47994b1a96208df68f8
Parents: 50171eb
Author: Alexei Scherbakov <al...@gmail.com>
Authored: Tue Apr 19 07:11:54 2016 +0300
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Fri May 13 16:11:14 2016 +0900
----------------------------------------------------------------------
.../main/java/org/apache/ignite/Ignition.java | 19 +-
.../org/apache/ignite/internal/IgnitionEx.java | 60 +++-
.../ignite/internal/GridGetOrStartSelfTest.java | 129 +++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 16 +-
.../org/apache/ignite/spark/IgniteContext.scala | 73 ++--
.../apache/ignite/spark/JavaIgniteContext.scala | 14 +-
.../spark/JavaEmbeddedIgniteRDDSelfTest.java | 343 +++++++++++++++++++
.../ignite/spark/JavaIgniteRDDSelfTest.java | 302 ----------------
.../spark/JavaStandaloneIgniteRDDSelfTest.java | 302 ++++++++++++++++
.../ignite/testsuites/IgniteRDDTestSuite.java | 40 +++
10 files changed, 935 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/core/src/main/java/org/apache/ignite/Ignition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java
index 99ee1d9..b4c01f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java
@@ -309,7 +309,7 @@ public class Ignition {
}
/**
- * Starts grid with given configuration. Note that this method is no-op if grid with the name
+ * Starts grid with given configuration. Note that this method will throw an exception if grid with the name
* provided in given configuration is already started.
*
* @param cfg Grid configuration. This cannot be {@code null}.
@@ -401,6 +401,23 @@ public class Ignition {
}
}
+
+ /**
+ * Gets or starts new grid instance if it hasn't been started yet.
+ *
+ * @param cfg Grid configuration. This cannot be {@code null}.
+ * @return Grid instance.
+ * @throws IgniteException If grid could not be started.
+ */
+ public static Ignite getOrStart(IgniteConfiguration cfg) throws IgniteException {
+ try {
+ return IgnitionEx.start(cfg, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
/**
* Loads Spring bean by its name from given Spring XML configuration file. If bean
* with such name doesn't exist, exception is thrown.
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 227e5c2..c46a05c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -499,7 +499,7 @@ public class IgnitionEx {
U.warn(null, "Default Spring XML file not found (is IGNITE_HOME set?): " + DFLT_CFG);
- return start0(new GridStartContext(new IgniteConfiguration(), null, springCtx)).grid();
+ return start0(new GridStartContext(new IgniteConfiguration(), null, springCtx), true).grid();
}
/**
@@ -512,11 +512,25 @@ public class IgnitionEx {
* also if named grid has already been started.
*/
public static Ignite start(IgniteConfiguration cfg) throws IgniteCheckedException {
- return start(cfg, null);
+ return start(cfg, null, true);
}
/**
- * Starts grid with given configuration. Note that this method is no-op if grid with the name
+ * Starts a grid with given configuration. If the grid is already started and failIfStarted set to TRUE
+ * an exception will be thrown.
+ *
+ * @param cfg Grid configuration. This cannot be {@code null}.
+ * failIfStarted Throw or not an exception if grid is already started.
+ * @return Started grid.
+ * @throws IgniteCheckedException If grid could not be started. This exception will be thrown
+ * also if named grid has already been started.
+ */
+ public static Ignite start(IgniteConfiguration cfg, boolean failIfStarted) throws IgniteCheckedException {
+ return start(cfg, null, failIfStarted);
+ }
+
+ /**
+ * Starts grid with given configuration. Note that this method will throw and exception if grid with the name
* provided in given configuration is already started.
*
* @param cfg Grid configuration. This cannot be {@code null}.
@@ -531,7 +545,27 @@ public class IgnitionEx {
public static Ignite start(IgniteConfiguration cfg, @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
A.notNull(cfg, "cfg");
- return start0(new GridStartContext(cfg, null, springCtx)).grid();
+ return start0(new GridStartContext(cfg, null, springCtx), true).grid();
+ }
+
+ /**
+ * Starts grid with given configuration. If the grid is already started and failIfStarted set to TRUE
+ * an exception will be thrown.
+ *
+ * @param cfg Grid configuration. This cannot be {@code null}.
+ * @param springCtx Optional Spring application context, possibly {@code null}.
+ * Spring bean definitions for bean injection are taken from this context.
+ * If provided, this context can be injected into grid tasks and grid jobs using
+ * {@link SpringApplicationContextResource @SpringApplicationContextResource} annotation.
+ * @param failIfStarted Throw or not an exception if grid is already started.
+ * @return Started grid.
+ * @throws IgniteCheckedException If grid could not be started. This exception will be thrown
+ * also if named grid has already been started.
+ */
+ public static Ignite start(IgniteConfiguration cfg, @Nullable GridSpringResourceContext springCtx, boolean failIfStarted) throws IgniteCheckedException {
+ A.notNull(cfg, "cfg");
+
+ return start0(new GridStartContext(cfg, null, springCtx), failIfStarted).grid();
}
/**
@@ -927,7 +961,7 @@ public class IgnitionEx {
// Use either user defined context or our one.
IgniteNamedInstance grid = start0(
- new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx));
+ new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx), true);
// Add it if it was not stopped during startup.
if (grid != null)
@@ -958,10 +992,11 @@ public class IgnitionEx {
* Starts grid with given configuration.
*
* @param startCtx Start context.
+ * @param failIfStarted Throw or not an exception if grid is already started.
* @return Started grid.
* @throws IgniteCheckedException If grid could not be started.
*/
- private static IgniteNamedInstance start0(GridStartContext startCtx) throws IgniteCheckedException {
+ private static IgniteNamedInstance start0(GridStartContext startCtx, boolean failIfStarted ) throws IgniteCheckedException {
assert startCtx != null;
String name = startCtx.config().getGridName();
@@ -984,12 +1019,15 @@ public class IgnitionEx {
}
}
- if (old != null) {
- if (name == null)
- throw new IgniteCheckedException("Default Ignite instance has already been started.");
+ if (old != null)
+ if (failIfStarted) {
+ if (name == null)
+ throw new IgniteCheckedException("Default Ignite instance has already been started.");
+ else
+ throw new IgniteCheckedException("Ignite instance with this name has already been started: " + name);
+ }
else
- throw new IgniteCheckedException("Ignite instance with this name has already been started: " + name);
- }
+ return old;
if (startCtx.config().getWarmupClosure() != null)
startCtx.config().getWarmupClosure().apply(startCtx.config());
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java
new file mode 100644
index 0000000..9b3985e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ * The GridGetOrStartSelfTest tests get or start semantics.
+ */
+
+@GridCommonTest(group = "Kernal Self")
+public class GridGetOrStartSelfTest extends GridCommonAbstractTest {
+ /** Concurrency. */
+ public static final int CONCURRENCY = 10;
+
+ /**
+ * Default constructor.
+ */
+ public GridGetOrStartSelfTest() {
+ super(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Tests default grid
+ */
+ public void testDefaultGridGetOrStart() throws Exception {
+ IgniteConfiguration cfg = getConfiguration(null);
+
+ try (Ignite ignite = Ignition.getOrStart(cfg)) {
+ try {
+ Ignition.start(cfg);
+
+ fail("Expected exception after grid started");
+ }
+ catch (IgniteException ignored) {
+ }
+
+ Ignite ignite2 = Ignition.getOrStart(cfg);
+
+ assertEquals("Must return same instance", ignite, ignite2);
+ }
+
+ assertTrue(G.allGrids().isEmpty());
+ }
+
+ /**
+ * Tests named grid
+ */
+ public void testNamedGridGetOrStart() throws Exception {
+ IgniteConfiguration cfg = getConfiguration("test");
+ try (Ignite ignite = Ignition.getOrStart(cfg)) {
+ try {
+ Ignition.start(cfg);
+
+ fail("Expected exception after grid started");
+ }
+ catch (IgniteException ignored) {
+ // No-op.
+ }
+
+ Ignite ignite2 = Ignition.getOrStart(cfg);
+
+ assertEquals("Must return same instance", ignite, ignite2);
+ }
+
+ assertTrue(G.allGrids().isEmpty());
+ }
+
+ /**
+ * Tests concurrent grid initialization
+ */
+ public void testConcurrentGridGetOrStartCon() throws Exception {
+ final IgniteConfiguration cfg = getConfiguration(null);
+
+ final AtomicReference<Ignite> ref = new AtomicReference<>();
+
+ try {
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ // must return same instance in each thread
+
+ try {
+ Ignite ignite = Ignition.getOrStart(cfg);
+
+ boolean set = ref.compareAndSet(null, ignite);
+
+ if (!set)
+ assertEquals(ref.get(), ignite);
+ }
+ catch (IgniteException e) {
+ throw new RuntimeException("Ignite error", e);
+ }
+ }
+ }, CONCURRENCY, "GridCreatorThread");
+ }
+ catch (Exception ignored) {
+ fail("Exception is not expected");
+ }
+
+ G.stopAll(true);
+
+ assertTrue(G.allGrids().isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9e2324c..bb4b0f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -20,19 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
import org.apache.ignite.GridSuppressedExceptionSelfTest;
-import org.apache.ignite.internal.ClusterGroupHostsSelfTest;
-import org.apache.ignite.internal.ClusterGroupSelfTest;
-import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
-import org.apache.ignite.internal.GridLifecycleAwareSelfTest;
-import org.apache.ignite.internal.GridLifecycleBeanSelfTest;
-import org.apache.ignite.internal.GridNodeMetricsLogSelfTest;
-import org.apache.ignite.internal.GridProjectionForCachesSelfTest;
-import org.apache.ignite.internal.GridReduceSelfTest;
-import org.apache.ignite.internal.GridReleaseTypeSelfTest;
-import org.apache.ignite.internal.GridSelfTest;
-import org.apache.ignite.internal.GridStartStopSelfTest;
-import org.apache.ignite.internal.GridStopWithCancelSelfTest;
-import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest;
@@ -129,6 +117,8 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(VariationsIteratorTest.class);
suite.addTestSuite(ConfigVariationsTestSuiteBuilderTest.class);
+ GridTestUtils.addTestIfNeeded(suite, GridGetOrStartSelfTest.class, ignoredTests);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 57fe84f..182605c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -17,13 +17,12 @@
package org.apache.ignite.spark
-
-import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.internal.util.IgniteUtils
import org.apache.ignite._
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.util.IgniteUtils
import org.apache.spark.sql.SQLContext
+import org.apache.spark.{Logging, SparkContext}
/**
* Ignite context.
@@ -36,15 +35,13 @@ import org.apache.spark.sql.SQLContext
class IgniteContext[K, V](
@transient val sparkContext: SparkContext,
cfgF: () \u21d2 IgniteConfiguration,
- client: Boolean = true
-) extends Serializable with Logging {
- @transient private val driver = true
-
+ standalone: Boolean = true
+ ) extends Serializable with Logging {
private val cfgClo = new Once(cfgF)
private val igniteHome = IgniteUtils.getIgniteHome
- if (!client) {
+ if (!standalone) {
// Get required number of executors with default equals to number of available executors.
val workers = sparkContext.getConf.getInt("spark.executor.instances",
sparkContext.getExecutorStorageStatus.length)
@@ -55,7 +52,7 @@ class IgniteContext[K, V](
logInfo("Will start Ignite nodes on " + workers + " workers")
// Start ignite server node on each worker in server mode.
- sparkContext.parallelize(1 to workers, workers).foreach(it \u21d2 ignite())
+ sparkContext.parallelize(1 to workers, workers).foreachPartition(it \u21d2 ignite())
}
// Make sure to start Ignite on context creation.
@@ -71,7 +68,7 @@ class IgniteContext[K, V](
sc: SparkContext,
springUrl: String,
client: Boolean
- ) {
+ ) {
this(sc, () \u21d2 IgnitionEx.loadConfiguration(springUrl).get1(), client)
}
@@ -84,7 +81,7 @@ class IgniteContext[K, V](
def this(
sc: SparkContext,
springUrl: String
- ) {
+ ) {
this(sc, () \u21d2 IgnitionEx.loadConfiguration(springUrl).get1())
}
@@ -124,10 +121,8 @@ class IgniteContext[K, V](
}
/**
- * Gets an Ignite instance supporting this context. Ignite instance will be started
- * if it has not been started yet.
- *
- * @return Ignite instance.
+ * Get or start Ignite instance it it's not started yet.
+ * @return
*/
def ignite(): Ignite = {
val home = IgniteUtils.getIgniteHome
@@ -142,24 +137,17 @@ class IgniteContext[K, V](
val igniteCfg = cfgClo()
+ // check if called from driver
+ if (sparkContext != null) igniteCfg.setClientMode(true)
+
try {
- Ignition.ignite(igniteCfg.getGridName)
+ Ignition.getOrStart(igniteCfg)
}
catch {
- case e: IgniteIllegalStateException \u21d2
- try {
- igniteCfg.setClientMode(client || driver)
-
- Ignition.start(igniteCfg)
- }
- catch {
- case e: IgniteException \u21d2 {
- logError("Failed to start Ignite client. Will try to use an existing instance with name: "
- + igniteCfg.getGridName, e)
-
- Ignition.ignite(igniteCfg.getGridName)
- }
- }
+ case e: IgniteException \u21d2
+ logError("Failed to start Ignite.", e)
+
+ throw e
}
}
@@ -167,7 +155,25 @@ class IgniteContext[K, V](
* Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be
* a no-op.
*/
- def close() = {
+ def close(shutdownIgniteOnWorkers: Boolean = false) = {
+ // additional check if called from driver
+ if (sparkContext != null && shutdownIgniteOnWorkers) {
+ // Get required number of executors with default equals to number of available executors.
+ val workers = sparkContext.getConf.getInt("spark.executor.instances",
+ sparkContext.getExecutorStorageStatus.length)
+
+ if (workers > 0) {
+ logInfo("Will stop Ignite nodes on " + workers + " workers")
+
+ // Start ignite server node on each worker in server mode.
+ sparkContext.parallelize(1 to workers, workers).foreachPartition(it \u21d2 doClose())
+ }
+ }
+
+ doClose()
+ }
+
+ private def doClose() = {
val igniteCfg = cfgClo()
Ignition.stop(igniteCfg.getGridName, false)
@@ -184,8 +190,11 @@ private class Once(clo: () \u21d2 IgniteConfiguration) extends Serializable {
def apply(): IgniteConfiguration = {
if (res == null) {
+
this.synchronized {
+
if (res == null)
+
res = clo()
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index e2d57bf..44b1cd9 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -34,10 +34,16 @@ import scala.reflect.ClassTag
* @tparam V Value type.
*/
class JavaIgniteContext[K, V](
- @scala.transient val sc: JavaSparkContext,
- val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+ @transient val sc: JavaSparkContext,
+ val cfgF: IgniteOutClosure[IgniteConfiguration],
+ standalone: Boolean = true
+ ) extends Serializable {
- @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply())
+ @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply(), standalone)
+
+ def this(sc: JavaSparkContext, cfgF: IgniteOutClosure[IgniteConfiguration]) {
+ this(sc, cfgF, true)
+ }
def this(sc: JavaSparkContext, springUrl: String) {
this(sc, new IgniteOutClosure[IgniteConfiguration] {
@@ -53,7 +59,7 @@ class JavaIgniteContext[K, V](
def ignite(): Ignite = ic.ignite()
- def close() = ic.close()
+ def close(shutdownIgniteOnWorkers:Boolean = false) = ic.close(shutdownIgniteOnWorkers)
private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
new file mode 100644
index 0000000..5ceaca7
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+/**
+ * Tests for {@link JavaIgniteRDD} (embedded mode).
+ */
+public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
+ /** For grid names generation */
+ private static AtomicInteger cntr = new AtomicInteger(1);
+
+ /** Grid names. */
+ private static ThreadLocal<Integer> gridNames = new ThreadLocal<Integer>() {
+ @Override protected Integer initialValue() {
+ return cntr.getAndIncrement();
+ }
+ };
+
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** Keys count. */
+ private static final int KEYS_CNT = 10000;
+
+ /** Cache name. */
+ private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+ /** Sum function. */
+ private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer x, Integer y) {
+ return x + y;
+ }
+ };
+
+ /** To pair function. */
+ private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+ /** {@inheritDoc} */
+ @Override public Tuple2<String, String> call(Integer i) {
+ return new Tuple2<>(String.valueOf(i), "val" + i);
+ }
+ };
+
+ /** (String, Integer); pair to Integer value function. */
+ private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+ /** (String, Entity) pair to Entity value function. */
+ private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+ new PairToValueFunction<>();
+
+ /** Integer to entity function. */
+ private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+ new PairFunction<Integer, String, Entity>() {
+ @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+ return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+ }
+ };
+
+ /**
+ * Default constructor.
+ */
+ public JavaEmbeddedIgniteRDDSelfTest() {
+ super(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Creates default spark context
+ */
+ private JavaSparkContext createContext() {
+ SparkConf conf = new SparkConf();
+
+ conf.set("spark.executor.instances", String.valueOf(GRID_CNT));
+
+ return new JavaSparkContext("local[" + GRID_CNT + "]", "test", conf);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStoreDataToIgnite() throws Exception {
+ JavaSparkContext sc = createContext();
+
+ JavaIgniteContext<String, String> ic = null;
+
+ try {
+ ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+ ic.fromCache(PARTITIONED_CACHE_NAME)
+ .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F));
+
+ Ignite ignite = ic.ignite();
+
+ IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ String val = cache.get(String.valueOf(i));
+
+ assertNotNull("Value was not put to cache for key: " + i, val);
+ assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+ }
+ }
+ finally {
+ if (ic != null)
+ ic.close(true);
+
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadDataFromIgnite() throws Exception {
+ JavaSparkContext sc = createContext();
+
+ JavaIgniteContext<String, Integer> ic = null;
+
+ try {
+ ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+ Ignite ignite = ic.ignite();
+
+ IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(String.valueOf(i), i);
+
+ JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+ int sum = values.fold(0, SUM_F);
+
+ int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+ assertEquals(expSum, sum);
+ }
+ finally {
+ if (ic != null)
+ ic.close(true);
+
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryObjectsFromIgnite() throws Exception {
+ fail("IGNITE-3009");
+
+ JavaSparkContext sc = createContext();
+
+ JavaIgniteContext<String, Entity> ic = null;
+
+ try {
+ ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ int cnt = 1001;
+
+ List<Integer> numbers = F.range(0, cnt);
+
+ cache.savePairs(sc.parallelize(numbers, GRID_CNT).mapToPair(INT_TO_ENTITY_F));
+
+ List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+ .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+ assertEquals("Invalid result length", 1, res.size());
+ assertEquals("Invalid result", 50, res.get(0).id());
+ assertEquals("Invalid result", "name50", res.get(0).name());
+ assertEquals("Invalid result", 5000, res.get(0).salary());
+
+// Ignite ignite = ic.ignite();
+// IgniteCache<Object, Object> underCache = ignite.cache(PARTITIONED_CACHE_NAME);
+// assertEquals("Invalid total count", cnt, underCache.size());
+
+ assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+ }
+ finally {
+ if (ic != null)
+ ic.close(true);
+
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryFieldsFromIgnite() throws Exception {
+ JavaSparkContext sc = createContext();
+
+ JavaIgniteContext<String, Entity> ic = null;
+
+ try {
+ ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F));
+
+ DataFrame df =
+ cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
+
+ df.printSchema();
+
+ Row[] res = df.collect();
+
+ assertEquals("Invalid result length", 1, res.length);
+ assertEquals("Invalid result", 50, res[0].get(0));
+ assertEquals("Invalid result", "name50", res[0].get(1));
+ assertEquals("Invalid result", 5000, res[0].get(2));
+
+ Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+ DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+ df.printSchema();
+
+ Row[] res0 = df0.collect();
+
+ assertEquals("Invalid result length", 1, res0.length);
+ assertEquals("Invalid result", 50, res0[0].get(0));
+ assertEquals("Invalid result", "name50", res0[0].get(1));
+ assertEquals("Invalid result", 5000, res0[0].get(2));
+
+ assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
+ }
+ finally {
+ if (ic != null)
+ ic.close(true);
+
+ sc.stop();
+ }
+ }
+
+ /** Finder. */
+ private static TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * @param gridName Grid name.
+ * @param client Client.
+ */
+ private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cfg.setClientMode(client);
+
+ cfg.setGridName(gridName);
+
+ return cfg;
+ }
+
+ /**
+ * Creates cache configuration.
+ */
+ private static CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setBackups(1);
+
+ ccfg.setName(PARTITIONED_CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, Entity.class);
+
+ return ccfg;
+ }
+
+ /**
+ * Ignite configiration provider.
+ */
+ static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration apply() {
+ try {
+ return getConfiguration("worker-" + gridNames.get(), false);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * @param <K>
+ * @param <V>
+ */
+ static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+ /** {@inheritDoc} */
+ @Override public V call(Tuple2<K, V> t) throws Exception {
+ return t._2();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
deleted file mode 100644
index becd90a..0000000
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark;
-
-import java.util.List;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import scala.Tuple2;
-
-/**
- * Tests for {@link JavaIgniteRDD}.
- */
-public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest {
- /** Grid count. */
- private static final int GRID_CNT = 3;
-
- /** Keys count. */
- private static final int KEYS_CNT = 10000;
-
- /** Cache name. */
- private static final String PARTITIONED_CACHE_NAME = "partitioned";
-
- /** Ip finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Sum function. */
- private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
- public Integer call(Integer x, Integer y) {
- return x + y;
- }
- };
-
- /** To pair function. */
- private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
- /** {@inheritDoc} */
- @Override public Tuple2<String, String> call(Integer i) {
- return new Tuple2<>(String.valueOf(i), "val" + i);
- }
- };
-
- /** (String, Integer); pair to Integer value function. */
- private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
-
- /** (String, Entity) pair to Entity value function. */
- private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
- new PairToValueFunction<>();
-
- /** Integer to entity function. */
- private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
- new PairFunction<Integer, String, Entity>() {
- @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
- return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
- }
- };
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- Ignition.stop("client", false);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- for (int i = 0; i < GRID_CNT; i++)
- Ignition.start(getConfiguration("grid-" + i, false));
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- for (int i = 0; i < GRID_CNT; i++)
- Ignition.stop("grid-" + i, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStoreDataToIgnite() throws Exception {
- JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
- try {
- JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
- ic.fromCache(PARTITIONED_CACHE_NAME)
- .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
-
- Ignite ignite = Ignition.ignite("grid-0");
-
- IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
-
- for (int i = 0; i < KEYS_CNT; i++) {
- String val = cache.get(String.valueOf(i));
-
- assertNotNull("Value was not put to cache for key: " + i, val);
- assertEquals("Invalid value stored for key: " + i, "val" + i, val);
- }
- }
- finally {
- sc.stop();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReadDataFromIgnite() throws Exception {
- JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
- try {
- JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
- Ignite ignite = Ignition.ignite("grid-0");
-
- IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
-
- for (int i = 0; i < KEYS_CNT; i++)
- cache.put(String.valueOf(i), i);
-
- JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
-
- int sum = values.fold(0, SUM_F);
-
- int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
-
- assertEquals(expSum, sum);
- }
- finally {
- sc.stop();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testQueryObjectsFromIgnite() throws Exception {
- JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
- try {
- JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
- JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
-
- cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
-
- List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
- .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
-
- assertEquals("Invalid result length", 1, res.size());
- assertEquals("Invalid result", 50, res.get(0).id());
- assertEquals("Invalid result", "name50", res.get(0).name());
- assertEquals("Invalid result", 5000, res.get(0).salary());
- assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
- }
- finally {
- sc.stop();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testQueryFieldsFromIgnite() throws Exception {
- JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
- try {
- JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
- JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
-
- cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
-
- DataFrame df =
- cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
-
- df.printSchema();
-
- Row[] res = df.collect();
-
- assertEquals("Invalid result length", 1, res.length);
- assertEquals("Invalid result", 50, res[0].get(0));
- assertEquals("Invalid result", "name50", res[0].get(1));
- assertEquals("Invalid result", 5000, res[0].get(2));
-
- Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
-
- DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
-
- df.printSchema();
-
- Row[] res0 = df0.collect();
-
- assertEquals("Invalid result length", 1, res0.length);
- assertEquals("Invalid result", 50, res0[0].get(0));
- assertEquals("Invalid result", "name50", res0[0].get(1));
- assertEquals("Invalid result", 5000, res0[0].get(2));
-
- assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
- }
- finally {
- sc.stop();
- }
-
- }
-
- /**
- * @param gridName Grid name.
- * @param client Client.
- */
- private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
-
- cfg.setCacheConfiguration(cacheConfiguration());
-
- cfg.setClientMode(client);
-
- cfg.setGridName(gridName);
-
- return cfg;
- }
-
- /**
- * Creates cache configuration.
- */
- private static CacheConfiguration<Object, Object> cacheConfiguration() {
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setBackups(1);
-
- ccfg.setName(PARTITIONED_CACHE_NAME);
-
- ccfg.setIndexedTypes(String.class, Entity.class);
-
- return ccfg;
- }
-
- /**
- * Ignite configiration provider.
- */
- static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
- /** {@inheritDoc} */
- @Override public IgniteConfiguration apply() {
- try {
- return getConfiguration("client", true);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * @param <K>
- * @param <V>
- */
- static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
- /** {@inheritDoc} */
- @Override public V call(Tuple2<K, V> t) throws Exception {
- return t._2();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
new file mode 100644
index 0000000..faa8fda
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+/**
+ * Tests for {@link JavaIgniteRDD} (standalone mode).
+ */
+public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** Keys count. */
+ private static final int KEYS_CNT = 10000;
+
+ /** Cache name. */
+ private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Sum function. */
+ private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer x, Integer y) {
+ return x + y;
+ }
+ };
+
+ /** To pair function. */
+ private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+ /** {@inheritDoc} */
+ @Override public Tuple2<String, String> call(Integer i) {
+ return new Tuple2<>(String.valueOf(i), "val" + i);
+ }
+ };
+
+ /** (String, Integer); pair to Integer value function. */
+ private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+ /** (String, Entity) pair to Entity value function. */
+ private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+ new PairToValueFunction<>();
+
+ /** Integer to entity function. */
+ private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+ new PairFunction<Integer, String, Entity>() {
+ @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+ return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ Ignition.stop("client", false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ for (int i = 0; i < GRID_CNT; i++)
+ Ignition.start(getConfiguration("grid-" + i, false));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ for (int i = 0; i < GRID_CNT; i++)
+ Ignition.stop("grid-" + i, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStoreDataToIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ ic.fromCache(PARTITIONED_CACHE_NAME)
+ .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
+
+ Ignite ignite = Ignition.ignite("grid-0");
+
+ IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ String val = cache.get(String.valueOf(i));
+
+ assertNotNull("Value was not put to cache for key: " + i, val);
+ assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+ }
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadDataFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ Ignite ignite = Ignition.ignite("grid-0");
+
+ IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(String.valueOf(i), i);
+
+ JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+ int sum = values.fold(0, SUM_F);
+
+ int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+ assertEquals(expSum, sum);
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryObjectsFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+ List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+ .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+ assertEquals("Invalid result length", 1, res.size());
+ assertEquals("Invalid result", 50, res.get(0).id());
+ assertEquals("Invalid result", "name50", res.get(0).name());
+ assertEquals("Invalid result", 5000, res.get(0).salary());
+ assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryFieldsFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+ DataFrame df =
+ cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
+
+ df.printSchema();
+
+ Row[] res = df.collect();
+
+ assertEquals("Invalid result length", 1, res.length);
+ assertEquals("Invalid result", 50, res[0].get(0));
+ assertEquals("Invalid result", "name50", res[0].get(1));
+ assertEquals("Invalid result", 5000, res[0].get(2));
+
+ Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+ DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+ df.printSchema();
+
+ Row[] res0 = df0.collect();
+
+ assertEquals("Invalid result length", 1, res0.length);
+ assertEquals("Invalid result", 50, res0[0].get(0));
+ assertEquals("Invalid result", "name50", res0[0].get(1));
+ assertEquals("Invalid result", 5000, res0[0].get(2));
+
+ assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
+ }
+ finally {
+ sc.stop();
+ }
+
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @param client Client.
+ */
+ private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cfg.setClientMode(client);
+
+ cfg.setGridName(gridName);
+
+ return cfg;
+ }
+
+ /**
+ * Creates cache configuration.
+ */
+ private static CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setBackups(1);
+
+ ccfg.setName(PARTITIONED_CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, Entity.class);
+
+ return ccfg;
+ }
+
+ /**
+ * Ignite configiration provider.
+ */
+ static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration apply() {
+ try {
+ return getConfiguration("client", true);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * @param <K>
+ * @param <V>
+ */
+ static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+ /** {@inheritDoc} */
+ @Override public V call(Tuple2<K, V> t) throws Exception {
+ return t._2();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/efe76f18/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java b/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java
new file mode 100644
index 0000000..a4177f0
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.spark.JavaEmbeddedIgniteRDDSelfTest;
+import org.apache.ignite.spark.JavaStandaloneIgniteRDDSelfTest;
+
+/**
+ * Test suit for Ignite RDD
+ */
+public class IgniteRDDTestSuite extends TestSuite {
+ /**
+ * @return Java Ignite RDD test suit.
+ * @throws Exception If failed.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Java Ignite RDD tests (standalone and embedded modes");
+
+ suite.addTest(new TestSuite(JavaEmbeddedIgniteRDDSelfTest.class));
+ suite.addTest(new TestSuite(JavaStandaloneIgniteRDDSelfTest.class));
+
+ return suite;
+ }
+}
\ No newline at end of file