You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/07 13:57:27 UTC
[26/34] incubator-tinkerpop git commit: worked with @dalaro to fix a
bug in HadoopPoolShimService. Reverted my last work on VertexProgramHelper.
worked with @dalaro to fix a bug in HadoopPoolShimService. Reverted my last work on VertexProgramHelper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/90e31599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/90e31599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/90e31599
Branch: refs/heads/TINKERPOP-1278
Commit: 90e3159969363e6a3383ffc64d58c27f76384a55
Parents: 797364c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jun 6 16:55:42 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jun 6 16:55:42 2016 -0600
----------------------------------------------------------------------
.../computer/util/VertexProgramHelper.java | 33 +++++---------------
.../io/gryo/kryoshim/KryoShimServiceLoader.java | 17 +++++-----
.../structure/io/HadoopPoolShimService.java | 2 +-
.../gremlin/hadoop/HadoopGraphProvider.java | 2 +-
.../spark/process/computer/SparkExecutor.java | 11 +++----
5 files changed, 23 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
index 2b3a0b2..bc67866 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
@@ -25,13 +25,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.util.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
@@ -42,8 +37,6 @@ import java.util.Set;
*/
public final class VertexProgramHelper {
- private static final GryoPool GRYO_POOL = GryoPool.build().create();
-
private VertexProgramHelper() {
}
@@ -74,33 +67,21 @@ public final class VertexProgramHelper {
final String byteString = Arrays.toString(Serializer.serializeObject(object));
configuration.setProperty(key, byteString.substring(1, byteString.length() - 1));
} catch (final IOException e) {
- try {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- GRYO_POOL.doWithWriter(kryo -> kryo.writeObject(outputStream, object));
- String byteString = Arrays.toString(outputStream.toByteArray());
- configuration.setProperty(key, byteString.substring(1, byteString.length() - 1));
- } catch (final Exception e1) {
- throw new IllegalArgumentException(e1.getMessage(), e1);
- }
+ throw new IllegalArgumentException(e.getMessage(), e);
}
}
public static <T> T deserialize(final Configuration configuration, final String key) {
- final String[] stringBytes = configuration.getString(key).split(",");
- byte[] bytes = new byte[stringBytes.length];
- for (int i = 0; i < stringBytes.length; i++) {
- bytes[i] = Byte.valueOf(stringBytes[i].trim());
- }
try {
+ final String[] stringBytes = configuration.getString(key).split(",");
+ byte[] bytes = new byte[stringBytes.length];
+ for (int i = 0; i < stringBytes.length; i++) {
+ bytes[i] = Byte.valueOf(stringBytes[i].trim());
+ }
return (T) Serializer.deserializeObject(bytes);
} catch (final IOException | ClassNotFoundException e) {
- try {
- return (T) GRYO_POOL.readWithKryo(kryo -> kryo.readClassAndObject(new Input(new ByteArrayInputStream(bytes))));
- } catch (final Exception e1) {
- throw new IllegalArgumentException(e1.getMessage(), e1);
- }
+ throw new IllegalArgumentException(e.getMessage(), e);
}
-
}
public static <S, E> Traversal.Admin<S, E> reverse(final Traversal.Admin<S, E> traversal) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 9184dd0..fd57a3c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -51,6 +51,7 @@ public class KryoShimServiceLoader {
public static void applyConfiguration(Configuration conf) {
KryoShimServiceLoader.conf = conf;
+ load(true);
}
/**
@@ -195,20 +196,20 @@ public class KryoShimServiceLoader {
if (0 == result) {
log.warn("Found two {} implementations with the same canonical classname: {}. " +
- "This may indicate a problem with the classpath/classloader such as " +
- "duplicate or conflicting copies of the file " +
- "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.",
- a.getClass().getCanonicalName());
+ "This may indicate a problem with the classpath/classloader such as " +
+ "duplicate or conflicting copies of the file " +
+ "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.",
+ a.getClass().getCanonicalName());
} else {
String winner = 0 < result ? a.getClass().getCanonicalName() : b.getClass().getCanonicalName();
log.warn("{} implementations {} and {} are tied with priority value {}. " +
- "Preferring {} to the other because it has a lexicographically greater classname. " +
- "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.",
- KryoShimService.class.getSimpleName(), a, b, ap, winner, SHIM_CLASS_SYSTEM_PROPERTY);
+ "Preferring {} to the other because it has a lexicographically greater classname. " +
+ "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.",
+ KryoShimService.class.getSimpleName(), a, b, ap, winner, SHIM_CLASS_SYSTEM_PROPERTY);
}
return result;
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index 5753d90..df72b71 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -71,6 +71,6 @@ public class HadoopPoolShimService implements KryoShimService {
@Override
public void applyConfiguration(Configuration conf) {
- KryoShimServiceLoader.applyConfiguration(conf);
+ HadoopPools.initialize(conf);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 57157db..e36c08d 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -112,7 +112,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
@Override
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
- System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, HadoopPoolShimService.class.getCanonicalName());
+ System.clearProperty(SHIM_CLASS_SYSTEM_PROPERTY);
this.graphSONInput = RANDOM.nextBoolean();
return new HashMap<String, Object>() {{
put(Graph.GRAPH, HadoopGraph.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 9e5ac53..4db8086 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -171,7 +170,7 @@ public final class SparkExecutor {
assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
newViewIncomingRDD
.foreachPartition(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
}); // need to complete a task so its BSP and the memory for this iteration is updated
return newViewIncomingRDD;
}
@@ -206,7 +205,7 @@ public final class SparkExecutor {
final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
if (mapReduce.getMapKeySort().isPresent())
@@ -217,7 +216,7 @@ public final class SparkExecutor {
public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
final Configuration apacheConfiguration) {
return mapRDD.mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
}
@@ -226,11 +225,11 @@ public final class SparkExecutor {
final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
final Configuration apacheConfiguration) {
JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
if (mapReduce.getReduceKeySort().isPresent())
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
return reduceRDD;
}
-}
+}
\ No newline at end of file