You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 16:13:45 UTC

tinkerpop git commit: Really simplified UnshadedKryoShimService and IoRegistryAwareKryoSerializer. Also, introduced a synchronization point in KryoServiceLoader.applyConfiguration() as I believe that multiple threads are creating a service over and over

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1389 db8e8c106 -> a97ba5745


Really simplified UnshadedKryoShimService and IoRegistryAwareKryoSerializer. Also, introduced a synchronization point in KryoServiceLoader.applyConfiguration() as I believe that multiple threads are creating a service over and over again. Hopefully this doesn't create a bottle neck. Going to test on the cluster.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a97ba574
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a97ba574
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a97ba574

Branch: refs/heads/TINKERPOP-1389
Commit: a97ba5745289b13a637de5437e87e9f6126e454f
Parents: db8e8c1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Oct 26 10:13:40 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 10:13:40 2016 -0600

----------------------------------------------------------------------
 .../io/gryo/kryoshim/KryoShimServiceLoader.java |  8 +--
 .../io/gryo/IoRegistryAwareKryoSerializer.java  | 70 ++++----------------
 .../unshaded/UnshadedKryoShimService.java       | 54 +++++----------
 .../SparkHadoopGraphGryoSerializerProvider.java |  7 +-
 .../computer/SparkHadoopGraphProvider.java      |  7 +-
 .../structure/io/SparkContextStorageCheck.java  | 11 +--
 6 files changed, 48 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 0051204..5f50f9e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -48,13 +48,13 @@ public class KryoShimServiceLoader {
      */
     public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService";
 
-    public static void applyConfiguration(final Configuration configuration) {
+    public synchronized static void applyConfiguration(final Configuration configuration) {
         if (null == KryoShimServiceLoader.configuration ||
+                null == KryoShimServiceLoader.cachedShimService ||
                 !ConfigurationUtils.toString(KryoShimServiceLoader.configuration).equals(ConfigurationUtils.toString(configuration))) {
             KryoShimServiceLoader.configuration = configuration;
             load(true);
-        } else
-            load(false);
+        }
     }
 
     /**
@@ -114,7 +114,7 @@ public class KryoShimServiceLoader {
             throw new IllegalStateException("Unable to load KryoShimService");
 
         // once the shim service is defined, configure it
-        log.info("Configuring KryoShimService {} with following configuration: {}",
+        log.info("Configuring KryoShimService {} with following configuration:\n####################\n{}\n####################",
                 cachedShimService.getClass().getCanonicalName(),
                 ConfigurationUtils.toString(configuration));
         cachedShimService.applyConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
index bf71fae..6d9b536 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -25,92 +25,48 @@
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
 import org.apache.spark.SparkConf;
 import org.apache.spark.serializer.KryoSerializer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.javatuples.Pair;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import java.util.Arrays;
 
 /**
  * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}.
  */
 public class IoRegistryAwareKryoSerializer extends KryoSerializer {
 
-    private final SparkConf conf;
+    private final SparkConf configuration;
 
     private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class);
 
-    public IoRegistryAwareKryoSerializer(final SparkConf conf) {
-        super(conf);
+    public IoRegistryAwareKryoSerializer(final SparkConf configuration) {
+        super(configuration);
         // store conf so that we can access its registry (if one is present) in newKryo()
-        this.conf = conf;
+        this.configuration = configuration;
     }
 
     @Override
     public Kryo newKryo() {
         final Kryo kryo = super.newKryo();
-
         return applyIoRegistryIfPresent(kryo);
     }
 
     private Kryo applyIoRegistryIfPresent(final Kryo kryo) {
-        if (!conf.contains(GryoPool.CONFIG_IO_REGISTRY)) {
-            log.info("SparkConf {} does not contain setting {}, skipping {} handling",
-                    GryoPool.CONFIG_IO_REGISTRY, conf, IoRegistry.class.getCanonicalName());
+        if (!this.configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) {
+            log.info("SparkConf does not contain setting {}, skipping {} handling", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName());
             return kryo;
         }
-
-        final String registryClassnames = conf.get(GryoPool.CONFIG_IO_REGISTRY);
-
-        for (String registryClassname : registryClassnames.split(",")) {
-            final IoRegistry registry;
-
-            try {
-                registry = (IoRegistry) Class.forName(registryClassname).newInstance();
-                log.info("Instantiated {}", registryClassname);
-            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
-                log.error("Unable to reflectively instantiate the {} implementation named {}",
-                        IoRegistry.class.getCanonicalName(), registryClassname, e);
-                return kryo;
-            }
-
-            // Left is the class targeted for serialization, right is a mess of potential types, including
-            // a shaded Serializer impl, unshaded Serializer impl, or Function<shaded.Kryo,shaded.Serializer>
-            final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
-
-            if (null == serializers) {
-                log.info("Invoking find({}.class) returned null on registry {}; ignoring this registry",
-                        GryoIo.class.getCanonicalName(), registry);
-                return kryo;
-            }
-
-            for (Pair<Class, Object> p : serializers) {
-                if (null == p.getValue1()) {
-                    // null on the right is fine
-                    log.info("Registering {} with default serializer", p.getValue0());
-                    kryo.register(p.getValue0());
-                } else if (p.getValue1() instanceof Serializer) {
-                    // unshaded serializer on the right is fine
-                    log.info("Registering {} with serializer {}", p.getValue0(), p.getValue1());
-                    kryo.register(p.getValue0(), (Serializer) p.getValue1());
-                } else {
-                    // anything else on the right is unsupported with Spark
-                    log.error("Serializer {} found in {} must implement {} " +
-                                    "(the shaded interface {} is not supported on Spark).  This class will be registered with " +
-                                    "the default behavior of Spark's KryoSerializer.",
-                            p.getValue1(), registryClassname, Serializer.class.getCanonicalName(),
-                            org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
-                    kryo.register(p.getValue0());
-                }
-            }
+        final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(this.configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create();
+        for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) {
+            log.info("Registering {} with serializer {} and id {}", type.getTargetClass().getCanonicalName(), type.getSerializerShim(), type.getId());
+            kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter<>(type.getSerializerShim()), type.getId());
         }
-
         return kryo;
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index 2b0efda..4932acb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -27,15 +27,13 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -43,22 +41,20 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 public class UnshadedKryoShimService implements KryoShimService {
 
-    private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class);
     private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>();
-    private static volatile boolean initialized;
+    private static volatile boolean INITIALIZED;
 
     @Override
     public Object readClassAndObject(final InputStream inputStream) {
-        final LinkedBlockingQueue<Kryo> kryos = initialize();
         Kryo k = null;
         try {
-            k = kryos.take();
+            k = KRYOS.take();
             return k.readClassAndObject(new Input(inputStream));
         } catch (final InterruptedException e) {
             throw new IllegalStateException(e);
         } finally {
             try {
-                kryos.put(k);
+                KRYOS.put(k);
             } catch (final InterruptedException e) {
                 throw new IllegalStateException(e);
             }
@@ -67,10 +63,9 @@ public class UnshadedKryoShimService implements KryoShimService {
 
     @Override
     public void writeClassAndObject(final Object object, OutputStream outputStream) {
-        final LinkedBlockingQueue<Kryo> kryos = initialize();
         Kryo k = null;
         try {
-            k = kryos.take();
+            k = KRYOS.take();
             final Output kryoOutput = new Output(outputStream);
             k.writeClassAndObject(kryoOutput, object);
             kryoOutput.flush();
@@ -78,7 +73,7 @@ public class UnshadedKryoShimService implements KryoShimService {
             throw new IllegalStateException(e);
         } finally {
             try {
-                kryos.put(k);
+                KRYOS.put(k);
             } catch (final InterruptedException e) {
                 throw new IllegalStateException(e);
             }
@@ -95,44 +90,25 @@ public class UnshadedKryoShimService implements KryoShimService {
         initialize(configuration);
     }
 
-    private LinkedBlockingQueue<Kryo> initialize() {
-        return initialize(new BaseConfiguration());
-    }
-
     private LinkedBlockingQueue<Kryo> initialize(final Configuration configuration) {
         // DCL is safe in this case due to volatility
-        if (!initialized) {
+        if (!INITIALIZED) {
             synchronized (UnshadedKryoShimService.class) {
-                if (!initialized) {
-                    final SparkConf sparkConf = new SparkConf();
-
-                    // Copy the user's IoRegistry from the param conf to the SparkConf we just created
-                    final String regStr = configuration.getString(GryoPool.CONFIG_IO_REGISTRY, null);
-                    if (null != regStr)  // SparkConf rejects null values with NPE, so this has to be checked before set(...)
-                        sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr);
-
+                if (!INITIALIZED) {
+                    // so we don't get a WARN that a new configuration is being created within an active context
+                    final SparkConf sparkConf = null == Spark.getContext() ? new SparkConf() : Spark.getContext().getConf().clone();
+                    configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
                     // Setting spark.serializer here almost certainly isn't necessary, but it doesn't hurt
                     sparkConf.set(Constants.SPARK_SERIALIZER, IoRegistryAwareKryoSerializer.class.getCanonicalName());
-                    final String registrator = configuration.getString(Constants.SPARK_KRYO_REGISTRATOR);
-                    if (null != registrator) {
-                        sparkConf.set(Constants.SPARK_KRYO_REGISTRATOR, registrator);
-                        log.info("Copied " + Constants.SPARK_KRYO_REGISTRATOR + ": {}", registrator);
-                    } else {
-                        log.info("Not copying " + Constants.SPARK_KRYO_REGISTRATOR);
-                    }
-
                     // Instantiate the spark.serializer
-                    final IoRegistryAwareKryoSerializer ioReg = new IoRegistryAwareKryoSerializer(sparkConf);
-
+                    final IoRegistryAwareKryoSerializer ioRegistrySerializer = new IoRegistryAwareKryoSerializer(sparkConf);
                     // Setup a pool backed by our spark.serializer instance
                     // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf)
-                    final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE,
-                            GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+                    final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
                     for (int i = 0; i < poolSize; i++) {
-                        KRYOS.add(ioReg.newKryo());
+                        KRYOS.add(ioRegistrySerializer.newKryo());
                     }
-
-                    initialized = true;
+                    INITIALIZED = true;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
index 19b9121..0e7fe0d 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
@@ -21,10 +21,8 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
@@ -34,7 +32,10 @@ import java.util.Map;
 public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGraphProvider {
 
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
-        Spark.close();
+        if (!SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) {
+            Spark.close();
+            System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName());
+        }
         final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
         config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
         config.remove(Constants.SPARK_KRYO_REGISTRATOR);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 878fd1e..8385610 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -52,9 +52,14 @@ import java.util.Map;
 @GraphProvider.Descriptor(computer = SparkGraphComputer.class)
 public class SparkHadoopGraphProvider extends HadoopGraphProvider {
 
+    protected static final String PREVIOUS_SPARK_PROVIDER = "previous.spark.provider";
+
     @Override
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
-        Spark.close();
+        if (this.getClass().equals(SparkHadoopGraphProvider.class) && !SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) {
+            Spark.close();
+            System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphProvider.class.getCanonicalName());
+        }
 
         final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
         config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);  // this makes the test suite go really fast

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
index f9e5172..614b7b9 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
@@ -19,6 +19,7 @@
 
 package org.apache.tinkerpop.gremlin.spark.structure.io;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
@@ -52,7 +53,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
     @Test
     @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
     public void shouldSupportHeadMethods() throws Exception {
-        final Storage storage = SparkContextStorage.open("local[4]");
+        final Storage storage = SparkContextStorage.open(graph.configuration());
         final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         super.checkHeadMethods(storage, graph.configuration().getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), outputLocation, PersistedInputRDD.class, PersistedInputRDD.class);
     }
@@ -60,7 +61,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
     @Test
     @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
     public void shouldSupportRemoveAndListMethods() throws Exception {
-        final Storage storage = SparkContextStorage.open("local[4]");
+        final Storage storage = SparkContextStorage.open(graph.configuration());
         final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         super.checkRemoveAndListMethods(storage, outputLocation);
     }
@@ -68,7 +69,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
     @Test
     @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
     public void shouldSupportCopyMethods() throws Exception {
-        final Storage storage = SparkContextStorage.open("local[4]");
+        final Storage storage = SparkContextStorage.open(graph.configuration());
         final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         final String newOutputLocation = "new-location-for-copy";
         super.checkCopyMethods(storage, outputLocation, newOutputLocation, PersistedInputRDD.class, PersistedInputRDD.class);
@@ -77,14 +78,14 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
     @Test
     @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
     public void shouldNotHaveResidualDataInStorage() throws Exception {
-        final Storage storage = SparkContextStorage.open("local[4]");
+        final Storage storage = SparkContextStorage.open(graph.configuration());
         final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         super.checkResidualDataInStorage(storage, outputLocation);
     }
 
     @Test
     public void shouldSupportDirectoryFileDistinction() throws Exception {
-        final Storage storage = SparkContextStorage.open("local[4]");
+        final Storage storage = SparkContextStorage.open(graph.configuration());
         for (int i = 0; i < 10; i++) {
             JavaSparkContext.fromSparkContext(Spark.getContext()).emptyRDD().setName("directory1/file1-" + i + ".txt.bz").persist(StorageLevel.DISK_ONLY());
         }