You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 16:13:45 UTC
tinkerpop git commit: Really simplified UnshadedKryoShimService and
IoRegistryAwareKryoSerializer. Also,
introduced a synchronization point in KryoServiceLoader.applyConfiguration()
as I believe that multiple threads are creating a service over and over
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1389 db8e8c106 -> a97ba5745
Really simplified UnshadedKryoShimService and IoRegistryAwareKryoSerializer. Also, introduced a synchronization point in KryoServiceLoader.applyConfiguration() as I believe that multiple threads are creating a service over and over again. Hopefully this doesn't create a bottle neck. Going to test on the cluster.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a97ba574
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a97ba574
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a97ba574
Branch: refs/heads/TINKERPOP-1389
Commit: a97ba5745289b13a637de5437e87e9f6126e454f
Parents: db8e8c1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Oct 26 10:13:40 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 10:13:40 2016 -0600
----------------------------------------------------------------------
.../io/gryo/kryoshim/KryoShimServiceLoader.java | 8 +--
.../io/gryo/IoRegistryAwareKryoSerializer.java | 70 ++++----------------
.../unshaded/UnshadedKryoShimService.java | 54 +++++----------
.../SparkHadoopGraphGryoSerializerProvider.java | 7 +-
.../computer/SparkHadoopGraphProvider.java | 7 +-
.../structure/io/SparkContextStorageCheck.java | 11 +--
6 files changed, 48 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 0051204..5f50f9e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -48,13 +48,13 @@ public class KryoShimServiceLoader {
*/
public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService";
- public static void applyConfiguration(final Configuration configuration) {
+ public synchronized static void applyConfiguration(final Configuration configuration) {
if (null == KryoShimServiceLoader.configuration ||
+ null == KryoShimServiceLoader.cachedShimService ||
!ConfigurationUtils.toString(KryoShimServiceLoader.configuration).equals(ConfigurationUtils.toString(configuration))) {
KryoShimServiceLoader.configuration = configuration;
load(true);
- } else
- load(false);
+ }
}
/**
@@ -114,7 +114,7 @@ public class KryoShimServiceLoader {
throw new IllegalStateException("Unable to load KryoShimService");
// once the shim service is defined, configure it
- log.info("Configuring KryoShimService {} with following configuration: {}",
+ log.info("Configuring KryoShimService {} with following configuration:\n####################\n{}\n####################",
cachedShimService.getClass().getCanonicalName(),
ConfigurationUtils.toString(configuration));
cachedShimService.applyConfiguration(configuration);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
index bf71fae..6d9b536 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -25,92 +25,48 @@
package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoSerializer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.javatuples.Pair;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
+import java.util.Arrays;
/**
* A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}.
*/
public class IoRegistryAwareKryoSerializer extends KryoSerializer {
- private final SparkConf conf;
+ private final SparkConf configuration;
private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class);
- public IoRegistryAwareKryoSerializer(final SparkConf conf) {
- super(conf);
+ public IoRegistryAwareKryoSerializer(final SparkConf configuration) {
+ super(configuration);
// store conf so that we can access its registry (if one is present) in newKryo()
- this.conf = conf;
+ this.configuration = configuration;
}
@Override
public Kryo newKryo() {
final Kryo kryo = super.newKryo();
-
return applyIoRegistryIfPresent(kryo);
}
private Kryo applyIoRegistryIfPresent(final Kryo kryo) {
- if (!conf.contains(GryoPool.CONFIG_IO_REGISTRY)) {
- log.info("SparkConf {} does not contain setting {}, skipping {} handling",
- GryoPool.CONFIG_IO_REGISTRY, conf, IoRegistry.class.getCanonicalName());
+ if (!this.configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) {
+ log.info("SparkConf does not contain setting {}, skipping {} handling", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName());
return kryo;
}
-
- final String registryClassnames = conf.get(GryoPool.CONFIG_IO_REGISTRY);
-
- for (String registryClassname : registryClassnames.split(",")) {
- final IoRegistry registry;
-
- try {
- registry = (IoRegistry) Class.forName(registryClassname).newInstance();
- log.info("Instantiated {}", registryClassname);
- } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
- log.error("Unable to reflectively instantiate the {} implementation named {}",
- IoRegistry.class.getCanonicalName(), registryClassname, e);
- return kryo;
- }
-
- // Left is the class targeted for serialization, right is a mess of potential types, including
- // a shaded Serializer impl, unshaded Serializer impl, or Function<shaded.Kryo,shaded.Serializer>
- final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
-
- if (null == serializers) {
- log.info("Invoking find({}.class) returned null on registry {}; ignoring this registry",
- GryoIo.class.getCanonicalName(), registry);
- return kryo;
- }
-
- for (Pair<Class, Object> p : serializers) {
- if (null == p.getValue1()) {
- // null on the right is fine
- log.info("Registering {} with default serializer", p.getValue0());
- kryo.register(p.getValue0());
- } else if (p.getValue1() instanceof Serializer) {
- // unshaded serializer on the right is fine
- log.info("Registering {} with serializer {}", p.getValue0(), p.getValue1());
- kryo.register(p.getValue0(), (Serializer) p.getValue1());
- } else {
- // anything else on the right is unsupported with Spark
- log.error("Serializer {} found in {} must implement {} " +
- "(the shaded interface {} is not supported on Spark). This class will be registered with " +
- "the default behavior of Spark's KryoSerializer.",
- p.getValue1(), registryClassname, Serializer.class.getCanonicalName(),
- org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
- kryo.register(p.getValue0());
- }
- }
+ final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(this.configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create();
+ for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) {
+ log.info("Registering {} with serializer {} and id {}", type.getTargetClass().getCanonicalName(), type.getSerializerShim(), type.getId());
+ kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter<>(type.getSerializerShim()), type.getId());
}
-
return kryo;
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index 2b0efda..4932acb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -27,15 +27,13 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.OutputStream;
@@ -43,22 +41,20 @@ import java.util.concurrent.LinkedBlockingQueue;
public class UnshadedKryoShimService implements KryoShimService {
- private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class);
private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>();
- private static volatile boolean initialized;
+ private static volatile boolean INITIALIZED;
@Override
public Object readClassAndObject(final InputStream inputStream) {
- final LinkedBlockingQueue<Kryo> kryos = initialize();
Kryo k = null;
try {
- k = kryos.take();
+ k = KRYOS.take();
return k.readClassAndObject(new Input(inputStream));
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
} finally {
try {
- kryos.put(k);
+ KRYOS.put(k);
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
@@ -67,10 +63,9 @@ public class UnshadedKryoShimService implements KryoShimService {
@Override
public void writeClassAndObject(final Object object, OutputStream outputStream) {
- final LinkedBlockingQueue<Kryo> kryos = initialize();
Kryo k = null;
try {
- k = kryos.take();
+ k = KRYOS.take();
final Output kryoOutput = new Output(outputStream);
k.writeClassAndObject(kryoOutput, object);
kryoOutput.flush();
@@ -78,7 +73,7 @@ public class UnshadedKryoShimService implements KryoShimService {
throw new IllegalStateException(e);
} finally {
try {
- kryos.put(k);
+ KRYOS.put(k);
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
@@ -95,44 +90,25 @@ public class UnshadedKryoShimService implements KryoShimService {
initialize(configuration);
}
- private LinkedBlockingQueue<Kryo> initialize() {
- return initialize(new BaseConfiguration());
- }
-
private LinkedBlockingQueue<Kryo> initialize(final Configuration configuration) {
// DCL is safe in this case due to volatility
- if (!initialized) {
+ if (!INITIALIZED) {
synchronized (UnshadedKryoShimService.class) {
- if (!initialized) {
- final SparkConf sparkConf = new SparkConf();
-
- // Copy the user's IoRegistry from the param conf to the SparkConf we just created
- final String regStr = configuration.getString(GryoPool.CONFIG_IO_REGISTRY, null);
- if (null != regStr) // SparkConf rejects null values with NPE, so this has to be checked before set(...)
- sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr);
-
+ if (!INITIALIZED) {
+ // so we don't get a WARN that a new configuration is being created within an active context
+ final SparkConf sparkConf = null == Spark.getContext() ? new SparkConf() : Spark.getContext().getConf().clone();
+ configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
// Setting spark.serializer here almost certainly isn't necessary, but it doesn't hurt
sparkConf.set(Constants.SPARK_SERIALIZER, IoRegistryAwareKryoSerializer.class.getCanonicalName());
- final String registrator = configuration.getString(Constants.SPARK_KRYO_REGISTRATOR);
- if (null != registrator) {
- sparkConf.set(Constants.SPARK_KRYO_REGISTRATOR, registrator);
- log.info("Copied " + Constants.SPARK_KRYO_REGISTRATOR + ": {}", registrator);
- } else {
- log.info("Not copying " + Constants.SPARK_KRYO_REGISTRATOR);
- }
-
// Instantiate the spark.serializer
- final IoRegistryAwareKryoSerializer ioReg = new IoRegistryAwareKryoSerializer(sparkConf);
-
+ final IoRegistryAwareKryoSerializer ioRegistrySerializer = new IoRegistryAwareKryoSerializer(sparkConf);
// Setup a pool backed by our spark.serializer instance
// Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf)
- final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE,
- GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+ final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
for (int i = 0; i < poolSize; i++) {
- KRYOS.add(ioReg.newKryo());
+ KRYOS.add(ioRegistrySerializer.newKryo());
}
-
- initialized = true;
+ INITIALIZED = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
index 19b9121..0e7fe0d 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
@@ -21,10 +21,8 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.util.Map;
@@ -34,7 +32,10 @@ import java.util.Map;
public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGraphProvider {
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
- Spark.close();
+ if (!SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) {
+ Spark.close();
+ System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName());
+ }
final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
config.remove(Constants.SPARK_KRYO_REGISTRATOR);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 878fd1e..8385610 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -52,9 +52,14 @@ import java.util.Map;
@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
public class SparkHadoopGraphProvider extends HadoopGraphProvider {
+ protected static final String PREVIOUS_SPARK_PROVIDER = "previous.spark.provider";
+
@Override
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
- Spark.close();
+ if (this.getClass().equals(SparkHadoopGraphProvider.class) && !SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) {
+ Spark.close();
+ System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphProvider.class.getCanonicalName());
+ }
final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a97ba574/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
index f9e5172..614b7b9 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.spark.structure.io;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
@@ -52,7 +53,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportHeadMethods() throws Exception {
- final Storage storage = SparkContextStorage.open("local[4]");
+ final Storage storage = SparkContextStorage.open(graph.configuration());
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
super.checkHeadMethods(storage, graph.configuration().getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), outputLocation, PersistedInputRDD.class, PersistedInputRDD.class);
}
@@ -60,7 +61,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportRemoveAndListMethods() throws Exception {
- final Storage storage = SparkContextStorage.open("local[4]");
+ final Storage storage = SparkContextStorage.open(graph.configuration());
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
super.checkRemoveAndListMethods(storage, outputLocation);
}
@@ -68,7 +69,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportCopyMethods() throws Exception {
- final Storage storage = SparkContextStorage.open("local[4]");
+ final Storage storage = SparkContextStorage.open(graph.configuration());
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
final String newOutputLocation = "new-location-for-copy";
super.checkCopyMethods(storage, outputLocation, newOutputLocation, PersistedInputRDD.class, PersistedInputRDD.class);
@@ -77,14 +78,14 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldNotHaveResidualDataInStorage() throws Exception {
- final Storage storage = SparkContextStorage.open("local[4]");
+ final Storage storage = SparkContextStorage.open(graph.configuration());
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
super.checkResidualDataInStorage(storage, outputLocation);
}
@Test
public void shouldSupportDirectoryFileDistinction() throws Exception {
- final Storage storage = SparkContextStorage.open("local[4]");
+ final Storage storage = SparkContextStorage.open(graph.configuration());
for (int i = 0; i < 10; i++) {
JavaSparkContext.fromSparkContext(Spark.getContext()).emptyRDD().setName("directory1/file1-" + i + ".txt.bz").persist(StorageLevel.DISK_ONLY());
}