You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 09:20:41 UTC
[17/50] incubator-ignite git commit: IGNITE-389 - IPC checked and API
improvements.
IGNITE-389 - IPC checked and API improvements.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b51f99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b51f99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b51f99e
Branch: refs/heads/ignite-998
Commit: 6b51f99e72eb11af25403f8ec50087c03b1f1fb7
Parents: 1d8643c
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 4 19:19:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 4 19:19:36 2015 -0700
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 4 +-
.../shmem/IpcSharedMemoryClientEndpoint.java | 2 +-
.../ipc/shmem/IpcSharedMemoryNativeLoader.java | 150 +++++++++++++++++--
.../shmem/IpcSharedMemoryServerEndpoint.java | 2 +-
.../IpcSharedMemoryCrashDetectionSelfTest.java | 2 +-
.../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 2 +-
.../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 2 +-
.../LoadWithCorruptedLibFileTestRunner.java | 2 +-
.../IpcSharedMemoryBenchmarkReader.java | 2 +-
.../IpcSharedMemoryBenchmarkWriter.java | 2 +-
.../hadoop/HadoopAbstractSelfTest.java | 1 +
.../org/apache/ignite/spark/IgniteContext.scala | 19 ++-
.../org/apache/ignite/spark/IgniteRDD.scala | 8 +-
13 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 0932212..9016b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9025,11 +9025,11 @@ public abstract class IgniteUtils {
hasShmem = false;
else {
try {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
hasShmem = true;
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException ignore) {
hasShmem = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index 27a234f..c935c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
boolean clear = true;
try {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log);
sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
index dc00ca6..8c345f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.ipc.shmem;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
@@ -25,6 +26,8 @@ import java.net.*;
import java.nio.channels.*;
import java.security.*;
import java.util.*;
+import java.util.jar.*;
+import java.util.zip.*;
import static org.apache.ignite.internal.IgniteVersionUtils.*;
@@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader {
/** Library name base. */
private static final String LIB_NAME_BASE = "igniteshmem";
+ /** Library jar name base. */
+ private static final String JAR_NAME_BASE = "shmem";
+
/** Library name. */
static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR;
@@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader {
}
/**
+ * @param log Logger, if available. If null, warnings will be printed out to console.
* @throws IgniteCheckedException If failed.
*/
- public static void load() throws IgniteCheckedException {
+ public static void load(IgniteLogger log) throws IgniteCheckedException {
if (loaded)
return;
@@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader {
if (loaded)
return;
- doLoad();
+ doLoad(log);
loaded = true;
}
@@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader {
/**
* @throws IgniteCheckedException If failed.
*/
- private static void doLoad() throws IgniteCheckedException {
+ private static void doLoad(IgniteLogger log) throws IgniteCheckedException {
assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class);
Collection<Throwable> errs = new ArrayList<>();
@@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader {
// Obtain lock on file to prevent concurrent extracts.
try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws");
- FileLock ignored = randomAccessFile.getChannel().lock()) {
+ FileLock ignored = randomAccessFile.getChannel().lock()) {
if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath()))
return;
@@ -134,6 +141,30 @@ public class IpcSharedMemoryNativeLoader {
if (extractAndLoad(errs, tmpDir, resourcePath()))
return;
+ try {
+ U.quietAndWarn(log, "Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME.");
+
+ String igniteHome = X.resolveIgniteHome();
+
+ File shmemJar = findShmemJar(errs, igniteHome);
+
+ if (shmemJar != null) {
+ try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) {
+ if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath()))
+ return;
+
+ if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath()))
+ return;
+
+ if (extractAndLoad(errs, jar, tmpDir, resourcePath()))
+ return;
+ }
+ }
+ }
+ catch (IgniteCheckedException ignore) {
+
+ }
+
// Failed to find the library.
assert !errs.isEmpty();
@@ -145,6 +176,32 @@ public class IpcSharedMemoryNativeLoader {
}
/**
+ * Tries to find shmem jar in IGNITE_HOME/libs folder.
+ *
+ * @param errs Collection of errors to add readable exception to.
+ * @param igniteHome Resolver IGNITE_HOME variable.
+ * @return File, if found.
+ */
+ private static File findShmemJar(Collection<Throwable> errs, String igniteHome) {
+ File libs = new File(igniteHome, "libs");
+
+ if (!libs.exists() || libs.isFile()) {
+ errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME: " + igniteHome));
+
+ return null;
+ }
+
+ for (File lib : libs.listFiles()) {
+ if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE))
+ return lib;
+ }
+
+ errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME: " + igniteHome));
+
+ return null;
+ }
+
+ /**
* Gets temporary directory unique for each OS user.
* The directory guaranteed to exist, though may not be empty.
*/
@@ -220,6 +277,24 @@ public class IpcSharedMemoryNativeLoader {
/**
* @param errs Errors collection.
+ * @param rsrcPath Path.
+ * @return {@code True} if library was found and loaded.
+ */
+ private static boolean extractAndLoad(Collection<Throwable> errs, JarFile jar, File tmpDir, String rsrcPath) {
+ ZipEntry rsrc = jar.getEntry(rsrcPath);
+
+ if (rsrc != null)
+ return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME)));
+ else {
+ errs.add(new IllegalStateException("Failed to find resource within specified jar file " +
+ "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']'));
+
+ return false;
+ }
+ }
+
+ /**
+ * @param errs Errors collection.
* @param src Source.
* @param target Target.
* @return {@code True} if resource was found and loaded.
@@ -230,7 +305,7 @@ public class IpcSharedMemoryNativeLoader {
InputStream is = null;
try {
- if (!target.exists() || !haveEqualMD5(target, src)) {
+ if (!target.exists() || !haveEqualMD5(target, src.openStream())) {
is = src.openStream();
if (is != null) {
@@ -265,20 +340,69 @@ public class IpcSharedMemoryNativeLoader {
}
/**
- * @param target Target.
+ * @param errs Errors collection.
* @param src Source.
+ * @param target Target.
+ * @return {@code True} if resource was found and loaded.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static boolean extract(Collection<Throwable> errs, ZipEntry src, JarFile jar, File target) {
+ FileOutputStream os = null;
+ InputStream is = null;
+
+ try {
+ if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) {
+ is = jar.getInputStream(src);
+
+ if (is != null) {
+ os = new FileOutputStream(target);
+
+ int read;
+
+ byte[] buf = new byte[4096];
+
+ while ((read = is.read(buf)) != -1)
+ os.write(buf, 0, read);
+ }
+ }
+
+ // chmod 775.
+ if (!U.isWindows())
+ Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor();
+
+ System.load(target.getPath());
+
+ return true;
+ }
+ catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) {
+ errs.add(e);
+ }
+ finally {
+ U.closeQuiet(os);
+ U.closeQuiet(is);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param target Target.
+ * @param srcIS Source input stream.
* @return {@code True} if target md5-sum equal to source md5-sum.
* @throws NoSuchAlgorithmException If md5 algorithm was not found.
* @throws IOException If an I/O exception occurs.
*/
- private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException {
- try (InputStream targetIS = new FileInputStream(target);
- InputStream srcIS = src.openStream()) {
-
- String targetMD5 = U.calculateMD5(targetIS);
- String srcMD5 = U.calculateMD5(srcIS);
+ private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException, IOException {
+ try {
+ try (InputStream targetIS = new FileInputStream(target)) {
+ String targetMD5 = U.calculateMD5(targetIS);
+ String srcMD5 = U.calculateMD5(srcIS);
- return targetMD5.equals(srcMD5);
+ return targetMD5.equals(srcMD5);
+ }
+ }
+ finally {
+ srcIS.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 5185856..102c5b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log);
pid = IpcSharedMemoryUtils.pid();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
index 2ddf6f3..c6f590e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
index 7dc0870..4afb64b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
@@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
index 4c5413c..176429e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
@@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
index 8ff827b..8fee239 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
@@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner {
createCorruptedLibFile();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
index 28495af..89eeda1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
@@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP
* @throws IgniteCheckedException If failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
index 2ade145..e8a8402 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP
* @throws IgniteCheckedException If failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index 517a587..a3c9bde 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 5cdbad0..2cfebd6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -34,8 +34,23 @@ import org.apache.spark.sql.SQLContext
*/
class IgniteContext[K, V](
@scala.transient val sparkContext: SparkContext,
- cfgF: () ⇒ IgniteConfiguration
+ cfgF: () ⇒ IgniteConfiguration,
+ client: Boolean = true
) extends Serializable {
+ @scala.transient private val driver = true
+
+ if (!client) {
+ val workers = sparkContext.getExecutorStorageStatus.length - 1
+
+ if (workers <= 0)
+ throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
+
+ println("Will start Ignite nodes on " + workers + " workers")
+
+ // Start ignite server node on each worker in server mode.
+ sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
+ }
+
def this(
sc: SparkContext,
springUrl: String
@@ -62,7 +77,7 @@ class IgniteContext[K, V](
catch {
case e: Exception ⇒
try {
- igniteCfg.setClientMode(true)
+ igniteCfg.setClientMode(client || driver)
Ignition.start(igniteCfg)
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..0d1a3be 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -114,7 +114,7 @@ class IgniteRDD[K, V] (
ic.sqlContext.createDataFrame(rowRdd, schema)
}
- def saveValues(rdd: RDD[V]) = {
+ def saveValues(rdd: RDD[V], overwrite: Boolean = false) = {
rdd.foreachPartition(it ⇒ {
val ig = ic.ignite()
@@ -127,6 +127,8 @@ class IgniteRDD[K, V] (
val streamer = ig.dataStreamer[Object, V](cacheName)
try {
+ streamer.allowOverwrite(overwrite)
+
it.foreach(value ⇒ {
val key = affinityKeyFunc(value, node.orNull)
@@ -139,7 +141,7 @@ class IgniteRDD[K, V] (
})
}
- def savePairs(rdd: RDD[(K, V)]) = {
+ def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = {
rdd.foreachPartition(it ⇒ {
val ig = ic.ignite()
@@ -149,6 +151,8 @@ class IgniteRDD[K, V] (
val streamer = ig.dataStreamer[K, V](cacheName)
try {
+ streamer.allowOverwrite(overwrite)
+
it.foreach(tup ⇒ {
streamer.addData(tup._1, tup._2)
})