You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/25 15:02:38 UTC
tinkerpop git commit: KryoShimService is no longer a META-INF service
with priorities and all that. Instead,
it is now a configuration property. If the user provided configuration DOES
NOT have gremlin.io.kryoShimService, then it defaults to HadoopPoolsS
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1389 532ed59c2 -> 49924cc66
KryoShimService is no longer a META-INF service with priorities and all that. Instead, it is now a configuration property. If the user provided configuration DOES NOT have gremlin.io.kryoShimService, then it defaults to HadoopPoolsShimService. However, for SparkGraphComputer, if KryoSerializer is being used, then the kryoShimService is hard-set to UnshadedKryoService (which is a Spark specific shim). This seems much cleaner and less error prone. Will need @dalaro to 'okay' the model.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/49924cc6
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/49924cc6
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/49924cc6
Branch: refs/heads/TINKERPOP-1389
Commit: 49924cc6697f255a553dac8b3b896b10e00e3211
Parents: 532ed59
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 25 09:01:44 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 25 09:01:44 2016 -0600
----------------------------------------------------------------------
.../io/gryo/kryoshim/KryoShimService.java | 27 +---
.../io/gryo/kryoshim/KryoShimServiceLoader.java | 140 +++----------------
.../gremlin/hadoop/structure/HadoopGraph.java | 7 +-
.../structure/io/HadoopPoolShimService.java | 5 -
...n.structure.io.gryo.kryoshim.KryoShimService | 1 -
.../process/computer/SparkGraphComputer.java | 26 ++--
.../unshaded/UnshadedKryoShimService.java | 13 +-
...n.structure.io.gryo.kryoshim.KryoShimService | 1 -
.../SparkHadoopGraphGryoSerializerProvider.java | 6 +-
.../computer/SparkHadoopGraphProvider.java | 6 -
.../spark/structure/io/ToyGraphInputRDD.java | 1 -
11 files changed, 37 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
index b8880a4..f8abd4e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
@@ -57,38 +57,13 @@ public interface KryoShimService {
/**
* Serializes an object to an output stream. This may flush the output stream.
*
- * @param o the object to serialize
+ * @param o the object to serialize
* @param sink the stream into which the serialized object is written
*/
public void writeClassAndObject(final Object o, final OutputStream sink);
/**
- * Returns this service's relative priority number. Unless explicitly overridden through a
- * system property ({@link KryoShimServiceLoader#KRYO_SHIM_SERVICE}),
- * the service implementation with the numerically highest priority will be used
- * and all others ignored. In other words, the highest priority wins (in the absence of a
- * system property override).
- * <p>
- * TinkerPop's current default implementation uses priority value zero.
- * <p>
- * Third-party implementations of this interface should (but are not technically required)
- * to use a priority value with absolute value greater than 100.
- * <p>
- * The implementation currently breaks priority ties by lexicographical comparison of
- * fully-qualified package-and-classname, but this tie-breaking behavior should be
- * considered undefined and subject to future change. Ties are ignored if the service
- * is explicitly set through the system property mentioned above.
- *
- * @return this implementation's priority value
- */
- public int getPriority();
-
- /**
* Attempt to incorporate the supplied configuration in future read/write calls.
- * <p>
- * This method is a wart that exists essentially just to support the old
- * {@link HadoopPools#initialize(Configuration)} use-case.
- * <p>
* This method is not guaranteed to have any effect on an instance of this interface
* after {@link #writeClassAndObject(Object, OutputStream)} or {@link #readClassAndObject(InputStream)}
* has been invoked on that particular instance.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index c026130..ac815b1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -24,9 +24,6 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.ServiceLoader;
/**
@@ -36,19 +33,18 @@ public class KryoShimServiceLoader {
private static volatile KryoShimService cachedShimService;
- private static volatile Configuration conf;
+ private static volatile Configuration configuration;
private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class);
/**
* Set this system property to the fully-qualified name of a {@link KryoShimService}
- * package-and-classname to force it into service. Setting this property causes the
- * priority-selection mechanism ({@link KryoShimService#getPriority()}) to be ignored.
+ * package-and-classname to force it into service.
*/
public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService";
- public static void applyConfiguration(final Configuration conf) {
- KryoShimServiceLoader.conf = conf;
+ public static void applyConfiguration(final Configuration configuration) {
+ KryoShimServiceLoader.configuration = configuration;
load(true);
}
@@ -63,71 +59,23 @@ public class KryoShimServiceLoader {
* @return the shim service
*/
public static KryoShimService load(final boolean forceReload) {
-
- if (null != cachedShimService && !forceReload) {
+ if (null != cachedShimService && !forceReload)
return cachedShimService;
- }
-
- final ArrayList<KryoShimService> services = new ArrayList<>();
-
- final ServiceLoader<KryoShimService> sl = ServiceLoader.load(KryoShimService.class);
-
- KryoShimService result = null;
-
- synchronized (KryoShimServiceLoader.class) {
- if (forceReload) {
- sl.reload();
- }
-
- for (KryoShimService kss : sl) {
- services.add(kss);
- }
- }
-
- String shimClass = System.getProperty(KRYO_SHIM_SERVICE);
-
- if (null != shimClass) {
- for (KryoShimService kss : services) {
- if (kss.getClass().getCanonicalName().equals(shimClass)) {
- log.info("Set {} provider to {} ({}) from system property {}={}",
- KryoShimService.class.getSimpleName(), kss, kss.getClass(),
- KRYO_SHIM_SERVICE, shimClass);
- result = kss;
- }
- }
- } else {
- Collections.sort(services, KryoShimServiceComparator.INSTANCE);
-
- for (KryoShimService kss : services) {
- log.debug("Found Kryo shim service class {} (priority {})", kss.getClass(), kss.getPriority());
- }
-
- if (0 != services.size()) {
- result = services.get(services.size() - 1);
-
- log.info("Set {} provider to {} ({}) because its priority value ({}) is the best available",
- KryoShimService.class.getSimpleName(), result, result.getClass(), result.getPriority());
- }
- }
-
-
- if (null == result) {
- throw new IllegalStateException("Unable to load KryoShimService");
- }
+ if (!configuration.containsKey(KRYO_SHIM_SERVICE))
+ throw new IllegalArgumentException("The provided configuration does not contain a " + KRYO_SHIM_SERVICE + " property");
- final Configuration userConf = conf;
-
- if (null != userConf) {
- log.info("Configuring {} provider {} with user-provided configuration",
- KryoShimService.class.getSimpleName(), result);
- result.applyConfiguration(userConf);
+ try {
+ cachedShimService = ((Class<? extends KryoShimService>) Class.forName(configuration.getString(KRYO_SHIM_SERVICE))).newInstance();
+ cachedShimService.applyConfiguration(configuration);
+ log.info("Using the following KryoShimService: " + cachedShimService.getClass().getCanonicalName());
+ return cachedShimService;
+ } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
-
- return cachedShimService = result;
}
/**
- * Equivalent to {@link #load(boolean)} with the parameter {@code true}.
+ * Equivalent to {@link #load(boolean)} with the parameter {@code false}.
*
* @return the (possibly cached) shim service
*/
@@ -136,7 +84,7 @@ public class KryoShimServiceLoader {
}
/**
- * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject(Output, Object)},
+ * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject},
* where the {@code output} parameter is an internally-created {@link ByteArrayOutputStream}. Returns
* the byte array underlying that stream.
*
@@ -144,17 +92,13 @@ public class KryoShimServiceLoader {
* @return the serialized form
*/
public static byte[] writeClassAndObjectToBytes(final Object o) {
- final KryoShimService shimService = load();
-
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- shimService.writeClassAndObject(o, baos);
-
+ load().writeClassAndObject(o, baos);
return baos.toByteArray();
}
/**
- * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject(Input)},
+ * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject},
* where the {@code input} parameter is {@code source}. Returns the deserialized object.
*
* @param source an input stream containing data for a serialized object class and instance
@@ -162,52 +106,6 @@ public class KryoShimServiceLoader {
* @return the deserialized object
*/
public static <T> T readClassAndObject(final InputStream source) {
- final KryoShimService shimService = load();
-
- return (T) shimService.readClassAndObject(source);
- }
-
- /**
- * Selects the service with greatest {@link KryoShimService#getPriority()}
- * (not absolute value).
- * <p>
- * Breaks ties with lexicographical comparison of classnames where the
- * name that sorts last is considered to have highest priority. Ideally
- * nothing should rely on that tiebreaking behavior, but it beats random
- * selection in case a user ever gets into that situation by accident and
- * tries to figure out what's going on.
- */
- private enum KryoShimServiceComparator implements Comparator<KryoShimService> {
- INSTANCE;
-
- @Override
- public int compare(final KryoShimService a, final KryoShimService b) {
- final int ap = a.getPriority();
- final int bp = b.getPriority();
-
- if (ap < bp) {
- return -1;
- } else if (bp < ap) {
- return 1;
- } else {
- final int result = a.getClass().getCanonicalName().compareTo(b.getClass().getCanonicalName());
-
- if (0 == result) {
- log.warn("Found two {} implementations with the same canonical classname: {}. " +
- "This may indicate a problem with the classpath/classloader such as " +
- "duplicate or conflicting copies of the file " +
- "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.",
- a.getClass().getCanonicalName());
- } else {
- final String winner = 0 < result ? a.getClass().getCanonicalName() : b.getClass().getCanonicalName();
- log.warn("{} implementations {} and {} are tied with priority value {}. " +
- "Preferring {} to the other because it has a lexicographically greater classname. " +
- "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.",
- KryoShimService.class.getSimpleName(), a, b, ap, winner, KRYO_SHIM_SERVICE);
- }
-
- return result;
- }
- }
+ return (T) load().readClassAndObject(source);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index d0f50d0..7043070 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -25,12 +25,14 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopEdgeIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopVertexIterator;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -109,7 +111,8 @@ import java.util.stream.Stream;
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_0sungBy_b__a_0sungBy_c__b_writtenBy_d__c_writtenBy_e__d_hasXname_George_HarisonX__e_hasXname_Bob_MarleyXX",
reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins are required. This particular tests takes many minutes to execute.",
- computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"}) // this is a nasty long test, just do it once in Java MatchTest
+ computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
+// this is a nasty long test, just do it once in Java MatchTest
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_0sungBy_b__a_0writtenBy_c__b_writtenBy_d__c_sungBy_d__d_hasXname_GarciaXX",
@@ -262,6 +265,8 @@ public final class HadoopGraph implements Graph {
private HadoopGraph(final Configuration configuration) {
this.configuration = new HadoopConfiguration(configuration);
+ if (!this.configuration.containsKey(KryoShimServiceLoader.KRYO_SHIM_SERVICE))
+ this.configuration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName());
}
public static HadoopGraph open() {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index 3fad4fd..f2e80ae 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -64,11 +64,6 @@ public class HadoopPoolShimService implements KryoShimService {
}
@Override
- public int getPriority() {
- return 0;
- }
-
- @Override
public void applyConfiguration(final Configuration conf) {
HadoopPools.initialize(conf);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
deleted file mode 100644
index 0b27e72..0000000
--- a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index c7d0cfb..42f2493 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -69,8 +69,6 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -97,7 +95,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
* An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
* for a {@link VertexProgram} a single threaded executor is sufficient.
*/
- private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+ private final ExecutorService computerService = Executors.newSingleThreadExecutor(this.threadFactoryBoss);
static {
TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class,
@@ -110,19 +108,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
super(hadoopGraph);
this.sparkConfiguration = new HadoopConfiguration();
ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
- if (KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) &&
- GryoRegistrator.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_KRYO_REGISTRATOR, null))) {
- System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, UnshadedKryoShimService.class.getCanonicalName());
- } else if (GryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) &&
- !this.sparkConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) {
- System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName());
- }
- if (null != System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)) {
- final String shimService = System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE);
- this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
- (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
- this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
- (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+ if (HadoopPoolShimService.class.getCanonicalName().equals(this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()))) {
+ this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
+ KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
+ UnshadedKryoShimService.class.getCanonicalName() :
+ HadoopPoolShimService.class.getCanonicalName());
}
}
@@ -154,8 +144,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
- if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
- graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ //if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
+ // graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index 0789d6a..5d963c9 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -44,14 +44,10 @@ import java.util.concurrent.LinkedBlockingQueue;
public class UnshadedKryoShimService implements KryoShimService {
private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class);
-
private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>();
-
+ private static final Configuration EMPTY_CONFIGURATION = new BaseConfiguration();
private static volatile boolean initialized;
- public UnshadedKryoShimService() {
- }
-
@Override
public Object readClassAndObject(final InputStream source) {
@@ -99,17 +95,12 @@ public class UnshadedKryoShimService implements KryoShimService {
}
@Override
- public int getPriority() {
- return -50;
- }
-
- @Override
public void applyConfiguration(final Configuration conf) {
initialize(conf);
}
private LinkedBlockingQueue<Kryo> initialize() {
- return initialize(new BaseConfiguration());
+ return initialize(EMPTY_CONFIGURATION);
}
private LinkedBlockingQueue<Kryo> initialize(final Configuration conf) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
deleted file mode 100644
index 68712a6..0000000
--- a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService # Supports Spark
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
index 9820b7b..33b538b 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
@@ -21,10 +21,8 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.util.Map;
@@ -34,9 +32,7 @@ import java.util.Map;
public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGraphProvider {
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
- if (this.getClass().equals(SparkHadoopGraphGryoSerializerProvider.class) &&
- !HadoopPoolShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)))
- Spark.close();
+ Spark.close();
final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
config.remove(Constants.SPARK_KRYO_REGISTRATOR);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index c5b5083..014381e 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -42,9 +42,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.util.Map;
@@ -56,10 +54,6 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider {
@Override
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
- if (this.getClass().equals(SparkHadoopGraphProvider.class) &&
- !UnshadedKryoShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)))
- Spark.close();
-
final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
index 4cd8cea..d59b4e1 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
@@ -47,7 +47,6 @@ public final class ToyGraphInputRDD implements InputRDD {
@Override
public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
- KryoShimServiceLoader.applyConfiguration(TinkerGraph.open().configuration());
final List<VertexWritable> vertices;
if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("modern"))
vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new));