You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/20 00:45:24 UTC
incubator-tinkerpop git commit: couldn't help myself. once I walked
away from the computer I realized how to make GiraphMemory work. Got it
working and test cases passing. Only one test fails in SparkGraphComputer. I
will handle THAT next week.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1166 b50a43ce7 -> 0ae584cae
couldn't help myself. once I walked away from the computer I realized how to make GiraphMemory work. Got it working and test cases passing. Only one test fails in SparkGraphComputer. I will handle THAT next week.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0ae584ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0ae584ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0ae584ca
Branch: refs/heads/TINKERPOP-1166
Commit: 0ae584cae51ab15eef7de776cf8c049b64ace852
Parents: b50a43c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 19 16:45:18 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 19 16:45:18 2016 -0700
----------------------------------------------------------------------
.../giraph/process/computer/GiraphMemory.java | 54 +++++++-------------
.../process/computer/MemoryAggregator.java | 44 +++++++---------
.../process/computer/util/MapMemory.java | 6 +--
.../gremlin/structure/io/gryo/GryoMapper.java | 9 +++-
.../hadoop/structure/io/ObjectWritable.java | 4 +-
5 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
index 50c0058..05f0e9f 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
@@ -34,16 +34,15 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.javatuples.Pair;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.stream.Collectors;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -86,7 +85,6 @@ public final class GiraphMemory extends MasterCompute implements Memory {
try {
for (final MemoryComputeKey key : this.memoryKeys.values()) {
this.registerPersistentAggregator(key.getKey(), MemoryAggregator.class);
- this.setAggregatedValue(key.getKey(), new ObjectWritable(key));
}
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -135,7 +133,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
@Override
public Set<String> keys() {
- return this.memoryKeys.keySet();
+ return this.memoryKeys.values().stream().filter(key -> this.exists(key.getKey())).map(MemoryComputeKey::getKey).collect(Collectors.toSet());
}
@Override
@@ -147,59 +145,43 @@ public final class GiraphMemory extends MasterCompute implements Memory {
@Override
public <R> R get(final String key) throws IllegalArgumentException {
//this.checkKey(key);
- final Object value = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
- if (null == value)
+ final ObjectWritable<Pair<BinaryOperator, Object>> value = this.isMasterCompute ?
+ this.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key) :
+ this.worker.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key);
+ if (null == value || value.isEmpty())
throw Memory.Exceptions.memoryDoesNotExist(key);
else
- return (R) value;
+ return (R) value.get().getValue1();
}
@Override
public void set(final String key, final Object value) {
this.checkKeyValue(key, value);
- if (this.isMasterCompute)
- this.setAggregatedValue(key, new ObjectWritable<>(value));
- else
- this.worker.aggregate(key, new ObjectWritable<>(value));
+ if (this.isMasterCompute) { // only called on setup() and terminate()
+ this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
+ } else {
+ this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
+ }
}
@Override
public void add(final String key, final Object value) {
this.checkKeyValue(key, value);
if (this.isMasterCompute) { // only called on setup() and terminate()
- this.setAggregatedValue(key, new ObjectWritable<>(value));
+ this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
} else {
- this.worker.aggregate(key, new ObjectWritable<>(value));
+ this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
}
}
@Override
public void write(final DataOutput output) {
- try {
- final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- final ObjectOutputStream objects = new ObjectOutputStream(bytes);
- output.writeInt(bytes.size());
- objects.writeObject(this.memoryKeys);
- objects.flush();
- output.write(bytes.toByteArray());
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ // all aggregator data is propagated through writables
}
@Override
public void readFields(final DataInput input) {
- try {
- final byte[] in = new byte[input.readInt()];
- for (int i = 0; i < in.length; i++) {
- in[i] = input.readByte();
- }
- final ByteArrayInputStream bytes = new ByteArrayInputStream(in);
- final ObjectInputStream objects = new ObjectInputStream(bytes);
- this.memoryKeys = (Map<String, MemoryComputeKey>) objects.readObject();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ // all aggregator data is propagated through writables
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
index 4180d2f..4929546 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
@@ -20,55 +20,49 @@ package org.apache.tinkerpop.gremlin.giraph.process.computer;
import org.apache.giraph.aggregators.Aggregator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.javatuples.Pair;
+
+import java.util.function.BinaryOperator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class MemoryAggregator implements Aggregator<ObjectWritable> {
+public final class MemoryAggregator implements Aggregator<ObjectWritable<Pair<BinaryOperator, Object>>> {
- private ObjectWritable currentObject;
- private MemoryComputeKey memoryComputeKey;
+ private ObjectWritable<Pair<BinaryOperator, Object>> currentObject = ObjectWritable.<Pair<BinaryOperator, Object>>empty();
public MemoryAggregator() { // for Giraph serialization
}
- public MemoryAggregator(final MemoryComputeKey memoryComputeKey) {
- this.currentObject = ObjectWritable.empty();
- this.memoryComputeKey = memoryComputeKey;
- }
-
@Override
- public ObjectWritable getAggregatedValue() {
+ public ObjectWritable<Pair<BinaryOperator, Object>> getAggregatedValue() {
return this.currentObject;
}
@Override
- public void setAggregatedValue(final ObjectWritable object) {
- if (null == object)
- this.currentObject = ObjectWritable.empty();
- else if (object.get() instanceof MemoryComputeKey)
- this.memoryComputeKey = (MemoryComputeKey) object.get();
- else
+ public void setAggregatedValue(final ObjectWritable<Pair<BinaryOperator, Object>> object) {
+ if (null != object)
this.currentObject = object;
}
@Override
- public void reset() {
- this.currentObject = ObjectWritable.empty();
+ public void aggregate(final ObjectWritable<Pair<BinaryOperator, Object>> object) {
+ if (null == object)
+ return;
+ else if (this.currentObject.isEmpty())
+ this.currentObject = object;
+ else if (!object.isEmpty())
+ this.currentObject.set(new Pair<>(object.get().getValue0(), object.get().getValue0().apply(this.currentObject.get().getValue1(), object.get().getValue1())));
}
@Override
- public ObjectWritable createInitialValue() {
- return ObjectWritable.empty();
+ public void reset() {
+ this.currentObject = ObjectWritable.<Pair<BinaryOperator, Object>>empty();
}
@Override
- public void aggregate(final ObjectWritable object) {
- if (this.currentObject.isEmpty())
- this.currentObject = object;
- else
- this.currentObject.set(this.memoryComputeKey.getReducer().apply(this.currentObject.get(), object.get()));
+ public ObjectWritable<Pair<BinaryOperator, Object>> createInitialValue() {
+ return ObjectWritable.<Pair<BinaryOperator, Object>>empty();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
index d63094a..7091062 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
@@ -45,9 +45,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
}
public MapMemory(final Memory otherMemory) {
- otherMemory.keys().forEach(key -> {
- this.memoryMap.put(key, otherMemory.get(key));
- });
+ otherMemory.keys().forEach(key -> this.memoryMap.put(key, otherMemory.get(key)));
this.iteration = otherMemory.getIteration();
}
@@ -56,7 +54,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
}
public void addMapReduceMemoryKey(final MapReduce mapReduce) {
- //this.memoryComputeKeys.add(mapReduce.getMemoryKey());
+ this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(),false));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 9a6f636..94fd7d6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.structure.io.gryo;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.Contains;
import org.apache.tinkerpop.gremlin.process.traversal.Path;
@@ -296,7 +297,7 @@ public final class GryoMapper implements Mapper<Kryo> {
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(AtomicLong.class, null, 79));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DependantMutableMetrics.class, null, 80));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Pair.class, kryo -> new PairSerializer(), 88));
- add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TraversalExplanation.class, null, 106)); // ***LAST ID**
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TraversalExplanation.class, null, 106));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Duration.class, kryo -> new JavaTimeSerializers.DurationSerializer(), 93));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Instant.class, kryo -> new JavaTimeSerializers.InstantSerializer(), 94));
@@ -311,6 +312,12 @@ public final class GryoMapper implements Mapper<Kryo> {
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(YearMonth.class, kryo -> new JavaTimeSerializers.YearMonthSerializer(), 103));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ZonedDateTime.class, kryo -> new JavaTimeSerializers.ZonedDateTimeSerializer(), 104));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ZoneOffset.class, kryo -> new JavaTimeSerializers.ZoneOffsetSerializer(), 105));
+
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.AndOperator.class, null, 107));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.OrOperator.class, null, 108));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SetOperator.class, null, 109));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumLongOperator.class, null, 110));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumIntegerOperator.class, null, 111)); // ***LAST ID**
}};
private final List<IoRegistry> registries = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ae584ca/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index a371bc3..5c45400 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -109,8 +109,8 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
return null == this.t;
}
- public static ObjectWritable empty() {
- return new ObjectWritable(null);
+ public static <A> ObjectWritable<A> empty() {
+ return new ObjectWritable<>(null);
}
@Override