You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/07 13:57:27 UTC

[26/34] incubator-tinkerpop git commit: worked with @dalaro to fix a bug in HadoopPoolShimService. Reverted my last work on VertexProgramHelper.

worked with @dalaro to fix a bug in HadoopPoolShimService. Reverted my last work on VertexProgramHelper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/90e31599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/90e31599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/90e31599

Branch: refs/heads/TINKERPOP-1278
Commit: 90e3159969363e6a3383ffc64d58c27f76384a55
Parents: 797364c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jun 6 16:55:42 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jun 6 16:55:42 2016 -0600

----------------------------------------------------------------------
 .../computer/util/VertexProgramHelper.java      | 33 +++++---------------
 .../io/gryo/kryoshim/KryoShimServiceLoader.java | 17 +++++-----
 .../structure/io/HadoopPoolShimService.java     |  2 +-
 .../gremlin/hadoop/HadoopGraphProvider.java     |  2 +-
 .../spark/process/computer/SparkExecutor.java   | 11 +++----
 5 files changed, 23 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
index 2b3a0b2..bc67866 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
@@ -25,13 +25,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -42,8 +37,6 @@ import java.util.Set;
  */
 public final class VertexProgramHelper {
 
-    private static final GryoPool GRYO_POOL = GryoPool.build().create();
-
     private VertexProgramHelper() {
     }
 
@@ -74,33 +67,21 @@ public final class VertexProgramHelper {
             final String byteString = Arrays.toString(Serializer.serializeObject(object));
             configuration.setProperty(key, byteString.substring(1, byteString.length() - 1));
         } catch (final IOException e) {
-            try {
-                final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-                GRYO_POOL.doWithWriter(kryo -> kryo.writeObject(outputStream, object));
-                String byteString = Arrays.toString(outputStream.toByteArray());
-                configuration.setProperty(key, byteString.substring(1, byteString.length() - 1));
-            } catch (final Exception e1) {
-                throw new IllegalArgumentException(e1.getMessage(), e1);
-            }
+            throw new IllegalArgumentException(e.getMessage(), e);
         }
     }
 
     public static <T> T deserialize(final Configuration configuration, final String key) {
-        final String[] stringBytes = configuration.getString(key).split(",");
-        byte[] bytes = new byte[stringBytes.length];
-        for (int i = 0; i < stringBytes.length; i++) {
-            bytes[i] = Byte.valueOf(stringBytes[i].trim());
-        }
         try {
+            final String[] stringBytes = configuration.getString(key).split(",");
+            byte[] bytes = new byte[stringBytes.length];
+            for (int i = 0; i < stringBytes.length; i++) {
+                bytes[i] = Byte.valueOf(stringBytes[i].trim());
+            }
             return (T) Serializer.deserializeObject(bytes);
         } catch (final IOException | ClassNotFoundException e) {
-            try {
-                return (T) GRYO_POOL.readWithKryo(kryo -> kryo.readClassAndObject(new Input(new ByteArrayInputStream(bytes))));
-            } catch (final Exception e1) {
-                throw new IllegalArgumentException(e1.getMessage(), e1);
-            }
+            throw new IllegalArgumentException(e.getMessage(), e);
         }
-
     }
 
     public static <S, E> Traversal.Admin<S, E> reverse(final Traversal.Admin<S, E> traversal) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 9184dd0..fd57a3c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -51,6 +51,7 @@ public class KryoShimServiceLoader {
 
     public static void applyConfiguration(Configuration conf) {
         KryoShimServiceLoader.conf = conf;
+        load(true);
     }
 
     /**
@@ -195,20 +196,20 @@ public class KryoShimServiceLoader {
 
                 if (0 == result) {
                     log.warn("Found two {} implementations with the same canonical classname: {}.  " +
-                             "This may indicate a problem with the classpath/classloader such as " +
-                             "duplicate or conflicting copies of the file " +
-                             "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.",
-                             a.getClass().getCanonicalName());
+                                    "This may indicate a problem with the classpath/classloader such as " +
+                                    "duplicate or conflicting copies of the file " +
+                                    "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.",
+                            a.getClass().getCanonicalName());
                 } else {
                     String winner = 0 < result ? a.getClass().getCanonicalName() : b.getClass().getCanonicalName();
                     log.warn("{} implementations {} and {} are tied with priority value {}.  " +
-                             "Preferring {} to the other because it has a lexicographically greater classname.  " +
-                             "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.",
-                             KryoShimService.class.getSimpleName(), a, b, ap, winner, SHIM_CLASS_SYSTEM_PROPERTY);
+                                    "Preferring {} to the other because it has a lexicographically greater classname.  " +
+                                    "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.",
+                            KryoShimService.class.getSimpleName(), a, b, ap, winner, SHIM_CLASS_SYSTEM_PROPERTY);
                 }
 
                 return result;
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index 5753d90..df72b71 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -71,6 +71,6 @@ public class HadoopPoolShimService implements KryoShimService {
 
     @Override
     public void applyConfiguration(Configuration conf) {
-        KryoShimServiceLoader.applyConfiguration(conf);
+        HadoopPools.initialize(conf);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 57157db..e36c08d 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -112,7 +112,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
 
     @Override
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
-        System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, HadoopPoolShimService.class.getCanonicalName());
+        System.clearProperty(SHIM_CLASS_SYSTEM_PROPERTY);
         this.graphSONInput = RANDOM.nextBoolean();
         return new HashMap<String, Object>() {{
             put(Graph.GRAPH, HadoopGraph.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 9e5ac53..4db8086 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -171,7 +170,7 @@ public final class SparkExecutor {
             assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
-                    HadoopPools.initialize(apacheConfiguration);
+                    KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
                 }); // need to complete a task so its BSP and the memory for this iteration is updated
         return newViewIncomingRDD;
     }
@@ -206,7 +205,7 @@ public final class SparkExecutor {
             final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
             final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
+            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
             return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
         if (mapReduce.getMapKeySort().isPresent())
@@ -217,7 +216,7 @@ public final class SparkExecutor {
     public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
                                                                     final Configuration apacheConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
+            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
             return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
     }
@@ -226,11 +225,11 @@ public final class SparkExecutor {
             final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
             final Configuration apacheConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
+            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
             return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
         return reduceRDD;
     }
-}
+}
\ No newline at end of file