You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/20 00:45:24 UTC

incubator-tinkerpop git commit: couldn't help myself. once I walked away from the computer I realized how to make GiraphMemory work. Got it working and test cases passing. Only one test fails in SparkGraphComputer. I will handle THAT next week.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1166 b50a43ce7 -> 0ae584cae


couldn't help myself. once I walked away from the computer I realized how to make GiraphMemory work. Got it working and test cases passing. Only one test fails in SparkGraphComputer. I will handle THAT next week.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0ae584ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0ae584ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0ae584ca

Branch: refs/heads/TINKERPOP-1166
Commit: 0ae584cae51ab15eef7de776cf8c049b64ace852
Parents: b50a43c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 19 16:45:18 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 19 16:45:18 2016 -0700

----------------------------------------------------------------------
 .../giraph/process/computer/GiraphMemory.java   | 54 +++++++-------------
 .../process/computer/MemoryAggregator.java      | 44 +++++++---------
 .../process/computer/util/MapMemory.java        |  6 +--
 .../gremlin/structure/io/gryo/GryoMapper.java   |  9 +++-
 .../hadoop/structure/io/ObjectWritable.java     |  4 +-
 5 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
index 50c0058..05f0e9f 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
@@ -34,16 +34,15 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.javatuples.Pair;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -86,7 +85,6 @@ public final class GiraphMemory extends MasterCompute implements Memory {
             try {
                 for (final MemoryComputeKey key : this.memoryKeys.values()) {
                     this.registerPersistentAggregator(key.getKey(), MemoryAggregator.class);
-                    this.setAggregatedValue(key.getKey(), new ObjectWritable(key));
                 }
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
@@ -135,7 +133,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public Set<String> keys() {
-        return this.memoryKeys.keySet();
+        return this.memoryKeys.values().stream().filter(key -> this.exists(key.getKey())).map(MemoryComputeKey::getKey).collect(Collectors.toSet());
     }
 
     @Override
@@ -147,59 +145,43 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
         //this.checkKey(key);
-        final Object value = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
-        if (null == value)
+        final ObjectWritable<Pair<BinaryOperator, Object>> value = this.isMasterCompute ?
+                this.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key) :
+                this.worker.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key);
+        if (null == value || value.isEmpty())
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
-            return (R) value;
+            return (R) value.get().getValue1();
     }
 
     @Override
     public void set(final String key, final Object value) {
         this.checkKeyValue(key, value);
-        if (this.isMasterCompute)
-            this.setAggregatedValue(key, new ObjectWritable<>(value));
-        else
-            this.worker.aggregate(key, new ObjectWritable<>(value));
+        if (this.isMasterCompute) {  // only called on setup() and terminate()
+            this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
+        } else {
+            this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
+        }
     }
 
     @Override
     public void add(final String key, final Object value) {
         this.checkKeyValue(key, value);
         if (this.isMasterCompute) {  // only called on setup() and terminate()
-            this.setAggregatedValue(key, new ObjectWritable<>(value));
+            this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
         } else {
-            this.worker.aggregate(key, new ObjectWritable<>(value));
+            this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
         }
     }
 
     @Override
     public void write(final DataOutput output) {
-        try {
-            final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-            final ObjectOutputStream objects = new ObjectOutputStream(bytes);
-            output.writeInt(bytes.size());
-            objects.writeObject(this.memoryKeys);
-            objects.flush();
-            output.write(bytes.toByteArray());
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
+        // all aggregator data is propagated through writables
     }
 
     @Override
     public void readFields(final DataInput input) {
-        try {
-            final byte[] in = new byte[input.readInt()];
-            for (int i = 0; i < in.length; i++) {
-                in[i] = input.readByte();
-            }
-            final ByteArrayInputStream bytes = new ByteArrayInputStream(in);
-            final ObjectInputStream objects = new ObjectInputStream(bytes);
-            this.memoryKeys = (Map<String, MemoryComputeKey>) objects.readObject();
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
+        // all aggregator data is propagated through writables
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
index 4180d2f..4929546 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
@@ -20,55 +20,49 @@ package org.apache.tinkerpop.gremlin.giraph.process.computer;
 
 import org.apache.giraph.aggregators.Aggregator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.javatuples.Pair;
+
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryAggregator implements Aggregator<ObjectWritable> {
+public final class MemoryAggregator implements Aggregator<ObjectWritable<Pair<BinaryOperator, Object>>> {
 
-    private ObjectWritable currentObject;
-    private MemoryComputeKey memoryComputeKey;
+    private ObjectWritable<Pair<BinaryOperator, Object>> currentObject = ObjectWritable.<Pair<BinaryOperator, Object>>empty();
 
     public MemoryAggregator() { // for Giraph serialization
 
     }
 
-    public MemoryAggregator(final MemoryComputeKey memoryComputeKey) {
-        this.currentObject = ObjectWritable.empty();
-        this.memoryComputeKey = memoryComputeKey;
-    }
-
     @Override
-    public ObjectWritable getAggregatedValue() {
+    public ObjectWritable<Pair<BinaryOperator, Object>> getAggregatedValue() {
         return this.currentObject;
     }
 
     @Override
-    public void setAggregatedValue(final ObjectWritable object) {
-        if (null == object)
-            this.currentObject = ObjectWritable.empty();
-        else if (object.get() instanceof MemoryComputeKey)
-            this.memoryComputeKey = (MemoryComputeKey) object.get();
-        else
+    public void setAggregatedValue(final ObjectWritable<Pair<BinaryOperator, Object>> object) {
+        if (null != object)
             this.currentObject = object;
     }
 
     @Override
-    public void reset() {
-        this.currentObject = ObjectWritable.empty();
+    public void aggregate(final ObjectWritable<Pair<BinaryOperator, Object>> object) {
+        if (null == object)
+            return;
+        else if (this.currentObject.isEmpty())
+            this.currentObject = object;
+        else if (!object.isEmpty())
+            this.currentObject.set(new Pair<>(object.get().getValue0(), object.get().getValue0().apply(this.currentObject.get().getValue1(), object.get().getValue1())));
     }
 
     @Override
-    public ObjectWritable createInitialValue() {
-        return ObjectWritable.empty();
+    public void reset() {
+        this.currentObject = ObjectWritable.<Pair<BinaryOperator, Object>>empty();
     }
 
     @Override
-    public void aggregate(final ObjectWritable object) {
-        if (this.currentObject.isEmpty())
-            this.currentObject = object;
-        else
-            this.currentObject.set(this.memoryComputeKey.getReducer().apply(this.currentObject.get(), object.get()));
+    public ObjectWritable<Pair<BinaryOperator, Object>> createInitialValue() {
+        return ObjectWritable.<Pair<BinaryOperator, Object>>empty();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
index d63094a..7091062 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
@@ -45,9 +45,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
     }
 
     public MapMemory(final Memory otherMemory) {
-        otherMemory.keys().forEach(key -> {
-            this.memoryMap.put(key, otherMemory.get(key));
-        });
+        otherMemory.keys().forEach(key -> this.memoryMap.put(key, otherMemory.get(key)));
         this.iteration = otherMemory.getIteration();
     }
 
@@ -56,7 +54,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
     }
 
     public void addMapReduceMemoryKey(final MapReduce mapReduce) {
-        //this.memoryComputeKeys.add(mapReduce.getMemoryKey());
+       this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(),false));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 9a6f636..94fd7d6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.Contains;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
@@ -296,7 +297,7 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(AtomicLong.class, null, 79));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DependantMutableMetrics.class, null, 80));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Pair.class, kryo -> new PairSerializer(), 88));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TraversalExplanation.class, null, 106)); // ***LAST ID**
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TraversalExplanation.class, null, 106));
 
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Duration.class, kryo -> new JavaTimeSerializers.DurationSerializer(), 93));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Instant.class, kryo -> new JavaTimeSerializers.InstantSerializer(), 94));
@@ -311,6 +312,12 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(YearMonth.class, kryo -> new JavaTimeSerializers.YearMonthSerializer(), 103));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ZonedDateTime.class, kryo -> new JavaTimeSerializers.ZonedDateTimeSerializer(), 104));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ZoneOffset.class, kryo -> new JavaTimeSerializers.ZoneOffsetSerializer(), 105));
+
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.AndOperator.class, null, 107));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.OrOperator.class, null, 108));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SetOperator.class, null, 109));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumLongOperator.class, null, 110));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumIntegerOperator.class, null, 111)); // ***LAST ID**
         }};
 
         private final List<IoRegistry> registries = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index a371bc3..5c45400 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -109,8 +109,8 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
         return null == this.t;
     }
 
-    public static ObjectWritable empty() {
-        return new ObjectWritable(null);
+    public static <A> ObjectWritable<A> empty() {
+        return new ObjectWritable<>(null);
     }
 
     @Override