You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:02:11 UTC
[45/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop
directories to org/apache/tinkerpop
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java
deleted file mode 100644
index b669aa9..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Optional;
-
-/**
- * A MapReduce is composed of map(), combine(), and reduce() stages.
- * The map() stage processes the vertices of the {@link com.tinkerpop.gremlin.structure.Graph} in a logically parallel manner.
- * The combine() stage aggregates the values of a particular map emitted key prior to sending across the cluster.
- * The reduce() stage aggregates the values of the combine/map emitted keys for the keys that hash to the current machine in the cluster.
- * The interface presented here is nearly identical to the interface popularized by Hadoop save the the map() is over the vertices of the graph.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
-
- public static final String MAP_REDUCE = "gremlin.mapReduce";
-
- /**
- * MapReduce is composed of three stages: map, combine, and reduce.
- */
- public static enum Stage {
- MAP, COMBINE, REDUCE
- }
-
- /**
- * When it is necessary to store the state of a MapReduce job, this method is called.
- * This is typically required when the MapReduce job needs to be serialized to another machine.
- * Note that what is stored is simply the instance state, not any processed data.
- *
- * @param configuration the configuration to store the state of the MapReduce job in.
- */
- public default void storeState(final Configuration configuration) {
- configuration.setProperty(MAP_REDUCE, this.getClass().getName());
- }
-
- /**
- * When it is necessary to load the state of a MapReduce job, this method is called.
- * This is typically required when the MapReduce job needs to be serialized to another machine.
- * Note that what is loaded is simply the instance state, not any processed data.
- * <p/>
- * It is important that the state loaded from loadState() is identical to any state created from a constructor.
- * For those GraphComputers that do not need to use Configurations to migrate state between JVMs, the constructor will only be used.
- *
- * @param configuration the configuration to load the state of the MapReduce job from.
- */
- public default void loadState(final Configuration configuration) {
-
- }
-
- /**
- * A MapReduce job can be map-only, map-reduce-only, or map-combine-reduce.
- * Before executing the particular stage, this method is called to determine if the respective stage is defined.
- * This method should return true if the respective stage as a non-default method implementation.
- *
- * @param stage the stage to check for definition.
- * @return whether that stage should be executed.
- */
- public boolean doStage(final Stage stage);
-
- /**
- * The map() method is logically executed at all vertices in the graph in parallel.
- * The map() method emits key/value pairs given some analysis of the data in the vertices (and/or its incident edges).
- *
- * @param vertex the current vertex being map() processed.
- * @param emitter the component that allows for key/value pairs to be emitted to the next stage.
- */
- public default void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
- }
-
- /**
- * The combine() method is logically executed at all "machines" in parallel.
- * The combine() method pre-combines the values for a key prior to propagation over the wire.
- * The combine() method must emit the same key/value pairs as the reduce() method.
- * If there is a combine() implementation, there must be a reduce() implementation.
- * If the MapReduce implementation is single machine, it can skip executing this method as reduce() is sufficient.
- *
- * @param key the key that has aggregated values
- * @param values the aggregated values associated with the key
- * @param emitter the component that allows for key/value pairs to be emitted to the reduce stage.
- */
- public default void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
- }
-
- /**
- * The reduce() method is logically on the "machine" the respective key hashes to.
- * The reduce() method combines all the values associated with the key and emits key/value pairs.
- *
- * @param key the key that has aggregated values
- * @param values the aggregated values associated with the key
- * @param emitter the component that allows for key/value pairs to be emitted as the final result.
- */
- public default void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
- }
-
- /**
- * If a {@link Comparator} is provided, then all pairs leaving the {@link MapEmitter} are sorted.
- * The sorted results are either fed sorted to the combine/reduce-stage or as the final output.
- * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
- * The default implementation returns {@link Optional#empty}.
- *
- * @return an {@link Optional} of a comparator for sorting the map output.
- */
- public default Optional<Comparator<MK>> getMapKeySort() {
- return Optional.empty();
- }
-
- /**
- * If a {@link Comparator} is provided, then all pairs leaving the {@link ReduceEmitter} are sorted.
- * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
- * The default implementation returns {@link Optional#empty}.
- *
- * @return an {@link Optional} of a comparator for sorting the reduce output.
- */
- public default Optional<Comparator<RK>> getReduceKeySort() {
- return Optional.empty();
- }
-
- /**
- * The key/value pairs emitted by reduce() (or map() in a map-only job) can be iterated to generate a local JVM Java object.
- *
- * @param keyValues the key/value pairs that were emitted from reduce() (or map() in a map-only job)
- * @return the resultant object formed from the emitted key/values.
- */
- public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues);
-
- /**
- * The results of the MapReduce job are associated with a memory-key to ultimately be stored in {@link Memory}.
- *
- * @return the memory key of the generated result object.
- */
- public String getMemoryKey();
-
- /**
- * The final result can be generated and added to {@link Memory} and accessible via {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult}.
- * The default simply takes the object from generateFinalResult() and adds it to the Memory given getMemoryKey().
- *
- * @param memory the memory of the {@link GraphComputer}
- * @param keyValues the key/value pairs emitted from reduce() (or map() in a map only job).
- */
- public default void addResultToMemory(final Memory.Admin memory, final Iterator<KeyValue<RK, RV>> keyValues) {
- memory.set(this.getMemoryKey(), this.generateFinalResult(keyValues));
- }
-
- /**
- * When multiple workers on a single machine need MapReduce instances, it is possible to use clone.
- * This will provide a speedier way of generating instances, over the {@link MapReduce#storeState} and {@link MapReduce#loadState} model.
- * The default implementation simply returns the object as it assumes that the MapReduce instance is a stateless singleton.
- *
- * @return A clone of the MapReduce object
- * @throws CloneNotSupportedException
- */
- public MapReduce<MK, MV, RK, RV, R> clone() throws CloneNotSupportedException;
-
- /**
- * A helper method to construct a {@link MapReduce} given the content of the supplied configuration.
- * The class of the MapReduce is read from the {@link MapReduce#MAP_REDUCE} static configuration key.
- * Once the MapReduce is constructed, {@link MapReduce#loadState} method is called with the provided configuration.
- *
- * @param configuration A configuration with requisite information to build a MapReduce
- * @return the newly constructed MapReduce
- */
- public static <M extends MapReduce<MK, MV, RK, RV, R>, MK, MV, RK, RV, R> M createMapReduce(final Configuration configuration) {
- try {
- final Class<M> mapReduceClass = (Class) Class.forName(configuration.getString(MAP_REDUCE));
- final Constructor<M> constructor = mapReduceClass.getDeclaredConstructor();
- constructor.setAccessible(true);
- final M mapReduce = constructor.newInstance();
- mapReduce.loadState(configuration);
- return mapReduce;
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- //////////////////
-
- /**
- * The MapEmitter is used to emit key/value pairs from the map() stage of the MapReduce job.
- * The implementation of MapEmitter is up to the vendor, not the developer.
- *
- * @param <K> the key type
- * @param <V> the value type
- */
- public interface MapEmitter<K, V> {
- public void emit(final K key, final V value);
-
- /**
- * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
- *
- * @param value the value to emit.
- */
- public default void emit(final V value) {
- this.emit((K) MapReduce.NullObject.instance(), value);
- }
- }
-
- /**
- * The ReduceEmitter is used to emit key/value pairs from the combine() and reduce() stages of the MapReduce job.
- * The implementation of ReduceEmitter is up to the vendor, not the developer.
- *
- * @param <OK> the key type
- * @param <OV> the value type
- */
- public interface ReduceEmitter<OK, OV> {
- public void emit(final OK key, OV value);
-
- /**
- * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
- *
- * @param value the value to emit.
- */
- public default void emit(final OV value) {
- this.emit((OK) MapReduce.NullObject.instance(), value);
- }
- }
-
- //////////////////
-
- /**
- * A convenience singleton when a single key is needed so that all emitted values converge to the same combiner/reducer.
- */
- public static class NullObject implements Comparable<NullObject>, Serializable {
- private static final NullObject INSTANCE = new NullObject();
- private static final String NULL_OBJECT = "";
-
- public static NullObject instance() {
- return INSTANCE;
- }
-
- @Override
- public int hashCode() {
- return 666666666;
- }
-
- @Override
- public boolean equals(final Object object) {
- return object instanceof NullObject;
- }
-
- @Override
- public int compareTo(final NullObject nullObject) {
- return 0;
- }
-
- @Override
- public String toString() {
- return NULL_OBJECT;
- }
-
- private void readObject(final ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
-
- }
-
- private void writeObject(final ObjectOutputStream outputStream) throws IOException {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java
deleted file mode 100644
index 8d573ea..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
-import org.javatuples.Pair;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * The Memory of a {@link GraphComputer} is a global data structure where by vertices can communicate information with one another.
- * Moreover, it also contains global information about the state of the computation such as runtime and the current iteration.
- * The Memory data is logically updated in parallel using associative/commutative methods which have embarrassingly parallel implementations.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface Memory {
-
- /**
- * Whether the key exists in the memory.
- *
- * @param key key to search the memory for.
- * @return whether the key exists
- */
- public default boolean exists(final String key) {
- return this.keys().contains(key);
- }
-
- /**
- * The set of keys currently associated with this memory.
- *
- * @return the memory's key set.
- */
- public Set<String> keys();
-
- /**
- * Get the value associated with the provided key.
- *
- * @param key the key of the value
- * @param <R> the type of the value
- * @return the value
- * @throws IllegalArgumentException is thrown if the key does not exist
- */
- public <R> R get(final String key) throws IllegalArgumentException;
-
- /**
- * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
- * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
- *
- * @param key they key to set a value for
- * @param value the value to set for the key
- */
- public void set(final String key, Object value);
-
- /**
- * A helper method that generates a {@link Map} of the memory key/values.
- *
- * @return the map representation of the memory key/values
- */
- public default Map<String, Object> asMap() {
- final Map<String, Object> map = keys().stream()
- .filter(this::exists)
- .map(key -> Pair.with(key, get(key)))
- .collect(Collectors.toMap(kv -> kv.getValue0(), Pair::getValue1));
- return Collections.unmodifiableMap(map);
- }
-
- /**
- * Get the current iteration number.
- *
- * @return the current iteration
- */
- public int getIteration();
-
- /**
- * Get the amount of milliseconds the {@link GraphComputer} has been executing thus far.
- *
- * @return the total time in milliseconds
- */
- public long getRuntime();
-
- /**
- * Add the provided delta value to the long value currently stored at the key.
- *
- * @param key the key of the long value
- * @param delta the adjusting amount (can be negative for decrement)
- * @return the adjusted value (according to the previous iterations current value + delta)
- */
- public long incr(final String key, final long delta);
-
- /**
- * Logically AND the provided boolean value with the boolean value currently stored at the key.
- *
- * @param key the key of the boolean value
- * @param bool the boolean to AND
- * @return the adjusted value (according to the previous iterations current value && bool)
- */
- public boolean and(final String key, final boolean bool);
-
- /**
- * Logically OR the provided boolean value with the boolean value currently stored at the key.
- *
- * @param key the key of the boolean value
- * @param bool the boolean to OR
- * @return the adjusted value (according to the previous iterations current value || bool)
- */
- public boolean or(final String key, final boolean bool);
-
- /**
- * A helper method that states whether the current iteration is 0.
- *
- * @return whether this is the first iteration
- */
- public default boolean isInitialIteration() {
- return this.getIteration() == 0;
- }
-
- /**
- * The Admin interface is used by the {@link GraphComputer} to update the Memory.
- * The developer should never need to type-cast the provided Memory to Memory.Admin.
- */
- public interface Admin extends Memory {
-
- public default void incrIteration() {
- this.setIteration(this.getIteration() + 1);
- }
-
- public void setIteration(final int iteration);
-
- public void setRuntime(final long runtime);
-
- public default Memory asImmutable() {
- return new ImmutableMemory(this);
- }
- }
-
-
- public static class Exceptions {
-
- public static IllegalArgumentException memoryKeyCanNotBeEmpty() {
- return new IllegalArgumentException("Graph computer memory key can not be the empty string");
- }
-
- public static IllegalArgumentException memoryKeyCanNotBeNull() {
- return new IllegalArgumentException("Graph computer memory key can not be null");
- }
-
- public static IllegalArgumentException memoryValueCanNotBeNull() {
- return new IllegalArgumentException("Graph computer memory value can not be null");
- }
-
- public static IllegalStateException memoryIsCurrentlyImmutable() {
- return new IllegalStateException("Graph computer memory is currently immutable");
- }
-
- public static IllegalArgumentException memoryDoesNotExist(final String key) {
- return new IllegalArgumentException("The memory does not have a value for provided key: " + key);
- }
-
- public static UnsupportedOperationException dataTypeOfMemoryValueNotSupported(final Object val) {
- return new UnsupportedOperationException(String.format("Graph computer memory value [%s] is of type %s is not supported", val, val.getClass()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java
deleted file mode 100644
index c2d7535..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-/**
- * A MessageCombiner allows two messages in route to the same vertex to be aggregated into a single message.
- * Message combining can reduce the number of messages sent between vertices and thus, reduce network traffic.
- * Not all messages can be combined and thus, this is an optional feature of a {@link VertexProgram}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface MessageCombiner<M> {
-
- /**
- * Combine two messages and return a message containing the combination.
- * In many instances, it is possible to simply merge the data in the second message into the first message.
- * Such an optimization can limit the amount of object creation.
- *
- * @param messageA the first message
- * @param messageB the second message
- * @return the combination of the two messages
- */
- public M combine(final M messageA, final M messageB);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java
deleted file mode 100644
index 75e4965..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.process.Traversal;
-import com.tinkerpop.gremlin.structure.Edge;
-import com.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Arrays;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-
-/**
- * A {@link MessageScope} represents the range of a message. A message can have multiple receivers and message scope
- * allows the underlying {@link GraphComputer} to apply the message passing algorithm in whichever manner is most efficient.
- * It is best to use {@link MessageScope.Local} if possible as that provides more room for optimization by vendors than {@link MessageScope.Global}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- * @author Matthias Broecheler (me@matthiasb.com)
- */
-public abstract class MessageScope {
-
- /**
- * A Global message is directed at an arbitrary vertex in the graph.
- * The recipient vertex need not be adjacent to the sending vertex.
- * This message scope should be avoided if a {@link Local} can be used.
- */
- public final static class Global extends MessageScope {
-
- private static final Global INSTANCE = Global.of();
-
- private final Iterable<Vertex> vertices;
-
- private Global(final Iterable<Vertex> vertices) {
- this.vertices = vertices;
- }
-
- public static Global of(final Iterable<Vertex> vertices) {
- return new Global(vertices);
- }
-
- public static Global of(final Vertex... vertices) {
- return new Global(Arrays.asList(vertices));
- }
-
- public Iterable<Vertex> vertices() {
- return this.vertices;
- }
-
- public static Global instance() {
- return INSTANCE;
- }
- }
-
- /**
- * A Local message is directed to an adjacent (or "memory adjacent") vertex.
- * The adjacent vertex set is defined by the provided {@link Traversal} that dictates how to go from the sending vertex to the receiving vertex.
- * This is the preferred message scope as it can potentially be optimized by the underlying {@link Messenger} implementation.
- * The preferred optimization is to not distribute a message object to all adjacent vertices.
- * Instead, allow the recipients to read a single message object stored at the "sending" vertex.
- * This is possible via `Traversal.reverse()`. This optimizations greatly reduces the amount of data created in the computation.
- *
- * @param <M> The {@link VertexProgram} message class
- */
- public final static class Local<M> extends MessageScope {
- public final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal;
- public final BiFunction<M, Edge, M> edgeFunction;
-
- private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
- this.incidentTraversal = incidentTraversal;
- this.edgeFunction = (final M m, final Edge e) -> m; // the default is an identity function
- }
-
- private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
- this.incidentTraversal = incidentTraversal;
- this.edgeFunction = edgeFunction;
- }
-
- public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
- return new Local(incidentTraversal);
- }
-
- public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
- return new Local<>(incidentTraversal, edgeFunction);
- }
-
- public BiFunction<M, Edge, M> getEdgeFunction() {
- return this.edgeFunction;
- }
-
- public Supplier<? extends Traversal<Vertex, Edge>> getIncidentTraversal() {
- return this.incidentTraversal;
- }
-
- /**
- * A helper class that can be used to generate the reverse traversal of the traversal within a {@link MessageScope.Local}.
- */
- public static class ReverseTraversalSupplier implements Supplier<Traversal<Vertex, Edge>> {
-
- private final MessageScope.Local<?> localMessageScope;
-
- public ReverseTraversalSupplier(final MessageScope.Local<?> localMessageScope) {
- this.localMessageScope = localMessageScope;
- }
-
- public Traversal<Vertex, Edge> get() {
- return this.localMessageScope.getIncidentTraversal().get().asAdmin().reverse();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java
deleted file mode 100644
index 087b61a..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-/**
- * The {@link Messenger} serves as the routing system for messages between vertices. For distributed systems,
- * the messenger can implement a "message passing" engine (distributed memory). For single machine systems, the
- * messenger can implement a "state sharing" engine (shared memory). Each messenger is tied to the particular
- * vertex distributing the message.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- * @author Matthias Broecheler (me@matthiasb.com)
- */
-public interface Messenger<M> {
-
- /**
- * The currently executing vertex can receive the messages of the provided {@link MessageScope}.
- *
- * @param messageScope the message scope of the messages to receive
- * @return the messages for that vertex
- */
- public Iterable<M> receiveMessages(final MessageScope messageScope);
-
- /**
- * The currently executing vertex can send a message with provided {@link MessageScope}.
- *
- * @param messageScope the message scope of the message being sent
- * @param message the message to send
- */
- public void sendMessage(final MessageScope messageScope, final M message);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java
deleted file mode 100644
index c207465..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * A {@link VertexProgram} represents one component of a distributed graph computation. Each vertex in the graph
- * (logically) executes the {@link VertexProgram} instance in parallel. The collective behavior yields
- * the computational result. In practice, a "worker" (i.e. task, thread, etc.) is responsible for executing the
- * VertexProgram against each vertex that it has in its vertex set (a subset of the full graph vertex set).
- * At minimum there is one "worker" for each vertex, though this is impractical in practice and {@link GraphComputer}
- * implementations that leverage such a design are not expected to perform well due to the excess object creation.
- * Any local state/fields in a VertexProgram is static to the vertices within the same worker set.
- * It is not safe to assume that the VertexProgram's "worker" state will remain stable between iterations.
- * Hence, the existence of {@link VertexProgram#workerIterationStart} and {@link VertexProgram#workerIterationEnd}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- * @author Matthias Broecheler (me@matthiasb.com)
- */
-public interface VertexProgram<M> extends Cloneable {
-
- public static final String VERTEX_PROGRAM = "gremlin.vertexProgram";
-
- /**
- * When it is necessary to store the state of the VertexProgram, this method is called.
- * This is typically required when the VertexProgram needs to be serialized to another machine.
- * Note that what is stored is simply the instance/configuration state, not any processed data.
- * The default implementation provided simply stores the VertexProgram class name for reflective reconstruction.
- * It is typically a good idea to VertexProgram.super.storeState().
- *
- * @param configuration the configuration to store the state of the VertexProgram in.
- */
- public default void storeState(final Configuration configuration) {
- configuration.setProperty(VERTEX_PROGRAM, this.getClass().getName());
- }
-
- /**
- * When it is necessary to load the state of the VertexProgram, this method is called.
- * This is typically required when the VertexProgram needs to be serialized to another machine.
- * Note that what is loaded is simply the instance state, not any processed data.
- * It is possible for this method to be called for each and every vertex in the graph.
- * Thus, it is important to only store/load that state which is needed during execution in order to reduce startup time.
- *
- * @param configuration the configuration to load the state of the VertexProgram from.
- */
- public default void loadState(final Configuration configuration) {
-
- }
-
- /**
- * The method is called at the beginning of the computation.
- * The method is global to the {@link GraphComputer} and as such, is not called for each vertex.
- * During this stage, the {@link Memory} should be initialized to to its "start state."
- *
- * @param memory The global memory of the GraphComputer
- */
- public void setup(final Memory memory);
-
- /**
- * This method denotes the main body of the computation and is executed on each vertex in the graph.
- * This method is logically executed in parallel on all vertices in the graph.
- * When the {@link Memory} is read, it is according to the aggregated state yielded in the previous iteration.
- * When the {@link Memory} is written, the data will be aggregated at the end of the iteration for reading in the next iteration.
- *
- * @param vertex the {@link Vertex} to execute the {@link VertexProgram} on
- * @param messenger the messenger that moves data between vertices
- * @param memory the shared state between all vertices in the computation
- */
- public void execute(final Vertex vertex, final Messenger<M> messenger, final Memory memory);
-
- /**
- * The method is called at the end of each iteration to determine if the computation is complete.
- * The method is global to the {@link GraphComputer} and as such, is not called for each {@link Vertex}.
- * The {@link Memory} maintains the aggregated data from the last execute() iteration.
- *
- * @param memory The global memory of the {@link GraphComputer}
- * @return whether or not to halt the computation
- */
- public boolean terminate(final Memory memory);
-
- /**
- * This method is called at the start of each iteration of each "computational chunk."
- * The set of vertices in the graph are typically not processed with full parallelism.
- * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
- * The typical use is to create static VertexProgram state that exists for the iteration of the vertex subset.
- * The default implementation is a no-op.
- *
- * @param memory The memory at the start of the iteration.
- */
- public default void workerIterationStart(final Memory memory) {
-
- }
-
- /**
- * This method is called at the end of each iteration of each "computational chunk."
- * The set of vertices in the graph are typically not processed with full parallelism.
- * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
- * The typical use is to destroy static VertexProgram state that existed during the iteration of the vertex subset.
- * The default implementation is a no-op.
- *
- * @param memory The memory at the start of the iteration.
- */
- public default void workerIterationEnd(final Memory memory) {
-
- }
-
- /**
- * The {@link com.tinkerpop.gremlin.structure.Element} properties that will be mutated during the computation.
- * All properties in the graph are readable, but only the keys specified here are writable.
- * The default is an empty set.
- *
- * @return the set of element keys that will be mutated during the vertex program's execution
- */
- public default Set<String> getElementComputeKeys() {
- return Collections.emptySet();
- }
-
- /**
- * The {@link Memory} keys that will be used during the computation.
- * These are the only keys that can be read or written throughout the life of the {@link GraphComputer}.
- * The default is an empty set.
- *
- * @return the set of memory keys that will be read/written
- */
- public default Set<String> getMemoryComputeKeys() {
- return Collections.emptySet();
- }
-
- /**
- * Combine the messages in route to a particular vertex. Useful to reduce the amount of data transmitted over the wire.
- * For example, instead of sending two objects that will ultimately be merged at the vertex destination, merge/combine into one and send that object.
- * If no message combiner is provider, then no messages will be combined.
- * Furthermore, it is not guaranteed the all messages in route to the vertex will be combined and thus, combiner-state should not be used.
- * The result of the vertex program algorithm should be the same regardless of whether message combining is executed or not.
- *
- * @return A optional denoting whether or not their is a message combine associated with the vertex program.
- */
- public default Optional<MessageCombiner<M>> getMessageCombiner() {
- return Optional.empty();
- }
-
- /**
- * This method returns all the {@link MessageScope} possibilities for a particular iteration of the vertex program.
- * The returned messages scopes are the scopes that will be used to send messages during the stated iteration.
- * It is not a requirement that all stated messages scopes be used, just that it is possible that they be used during the iteration.
- *
- * @param memory an immutable form of the {@link Memory}
- * @return all possible message scopes during said vertex program iteration
- */
- public Set<MessageScope> getMessageScopes(final Memory memory);
-
- /**
- * The set of {@link MapReduce} jobs that are associated with the {@link VertexProgram}.
- * This is not necessarily the exhaustive list over the life of the {@link GraphComputer}.
- * If MapReduce jobs are declared by GraphComputer.mapReduce(), they are not contained in this set.
- * The default is an empty set.
- *
- * @return the set of {@link MapReduce} jobs associated with this {@link VertexProgram}
- */
- public default Set<MapReduce> getMapReducers() {
- return Collections.emptySet();
- }
-
- /**
- * When multiple workers on a single machine need VertexProgram instances, it is possible to use clone.
- * This will provide a speedier way of generating instances, over the {@link VertexProgram#storeState} and {@link VertexProgram#loadState} model.
- * The default implementation simply returns the object as it assumes that the VertexProgram instance is a stateless singleton.
- *
- * @return A clone of the VertexProgram object
- * @throws CloneNotSupportedException
- */
- public VertexProgram<M> clone() throws CloneNotSupportedException;
-
- /**
- * A helper method to construct a {@link VertexProgram} given the content of the supplied configuration.
- * The class of the VertexProgram is read from the {@link VertexProgram#VERTEX_PROGRAM} static configuration key.
- * Once the VertexProgram is constructed, {@link VertexProgram#loadState} method is called with the provided configuration.
- *
- * @param configuration A configuration with requisite information to build a vertex program
- * @param <V> The vertex program type
- * @return the newly constructed vertex program
- */
- public static <V extends VertexProgram> V createVertexProgram(final Configuration configuration) {
- try {
- final Class<V> vertexProgramClass = (Class) Class.forName(configuration.getString(VERTEX_PROGRAM));
- final Constructor<V> constructor = vertexProgramClass.getDeclaredConstructor();
- constructor.setAccessible(true);
- final V vertexProgram = constructor.newInstance();
- vertexProgram.loadState(configuration);
- return vertexProgram;
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- public interface Builder {
-
- public Builder configure(final Object... keyValues);
-
- public <P extends VertexProgram> P create();
-
- }
-
- public default Features getFeatures() {
- return new Features() {
- };
- }
-
- public interface Features {
- public default boolean requiresGlobalMessageScopes() {
- return false;
- }
-
- public default boolean requiresLocalMessageScopes() {
- return false;
- }
-
- public default boolean requiresVertexAddition() {
- return false;
- }
-
- public default boolean requiresVertexRemoval() {
- return false;
- }
-
- public default boolean requiresVertexPropertyAddition() {
- return false;
- }
-
- public default boolean requiresVertexPropertyRemoval() {
- return false;
- }
-
- public default boolean requiresEdgeAddition() {
- return false;
- }
-
- public default boolean requiresEdgeRemoval() {
- return false;
- }
-
- public default boolean requiresEdgePropertyAddition() {
- return false;
- }
-
- public default boolean requiresEdgePropertyRemoval() {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
deleted file mode 100644
index 8d39f64..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
-
-import com.tinkerpop.gremlin.process.computer.KeyValue;
-import com.tinkerpop.gremlin.process.computer.MapReduce;
-import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import com.tinkerpop.gremlin.structure.Property;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.commons.configuration.Configuration;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject, Serializable, MapReduce.NullObject, Integer, Integer> {
-
- public static final String CLUSTER_COUNT_MEMORY_KEY = "gremlin.clusterCountMapReduce.memoryKey";
- public static final String DEFAULT_MEMORY_KEY = "clusterCount";
-
- private String memoryKey = DEFAULT_MEMORY_KEY;
-
-
- private ClusterCountMapReduce() {
-
- }
-
- private ClusterCountMapReduce(final String memoryKey) {
- this.memoryKey = memoryKey;
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- configuration.setProperty(CLUSTER_COUNT_MEMORY_KEY, this.memoryKey);
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.memoryKey = configuration.getString(CLUSTER_COUNT_MEMORY_KEY, DEFAULT_MEMORY_KEY);
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Serializable> emitter) {
- final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
- if (cluster.isPresent()) {
- emitter.emit(NullObject.instance(), cluster.value());
- }
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
- final Set<Serializable> set = new HashSet<>();
- values.forEachRemaining(set::add);
- emitter.emit(NullObject.instance(), set.size());
-
- }
-
- @Override
- public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
- return keyValues.next().getValue();
- }
-
- @Override
- public String getMemoryKey() {
- return this.memoryKey;
- }
-
- @Override
- public String toString() {
- return StringFactory.mapReduceString(this, this.memoryKey);
- }
-
- //////////////////////////////
-
- public static Builder build() {
- return new Builder();
- }
-
- public static class Builder {
-
- private String memoryKey = DEFAULT_MEMORY_KEY;
-
- private Builder() {
-
- }
-
- public Builder memoryKey(final String memoryKey) {
- this.memoryKey = memoryKey;
- return this;
- }
-
- public ClusterCountMapReduce create() {
- return new ClusterCountMapReduce(this.memoryKey);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
deleted file mode 100644
index a8a400d..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
-
-import com.tinkerpop.gremlin.process.computer.KeyValue;
-import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import com.tinkerpop.gremlin.structure.Property;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.commons.configuration.Configuration;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class ClusterPopulationMapReduce extends StaticMapReduce<Serializable, Long, Serializable, Long, Map<Serializable, Long>> {
-
- public static final String CLUSTER_POPULATION_MEMORY_KEY = "gremlin.clusterPopulationMapReduce.memoryKey";
- public static final String DEFAULT_MEMORY_KEY = "clusterPopulation";
-
- private String memoryKey = DEFAULT_MEMORY_KEY;
-
- private ClusterPopulationMapReduce() {
- }
-
- private ClusterPopulationMapReduce(final String memoryKey) {
- this.memoryKey = memoryKey;
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- configuration.setProperty(CLUSTER_POPULATION_MEMORY_KEY, this.memoryKey);
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.memoryKey = configuration.getString(CLUSTER_POPULATION_MEMORY_KEY, DEFAULT_MEMORY_KEY);
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<Serializable, Long> emitter) {
- final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
- if (cluster.isPresent()) {
- emitter.emit(cluster.value(), 1l);
- }
- }
-
- @Override
- public void combine(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
- long count = 0l;
- while (values.hasNext()) {
- count = count + values.next();
- }
- emitter.emit(key, count);
- }
-
- @Override
- public Map<Serializable, Long> generateFinalResult(final Iterator<KeyValue<Serializable, Long>> keyValues) {
- final Map<Serializable, Long> clusterPopulation = new HashMap<>();
- keyValues.forEachRemaining(pair -> clusterPopulation.put(pair.getKey(), pair.getValue()));
- return clusterPopulation;
- }
-
- @Override
- public String getMemoryKey() {
- return this.memoryKey;
- }
-
- @Override
- public String toString() {
- return StringFactory.mapReduceString(this, this.memoryKey);
- }
-
- //////////////////////////////
-
- public static Builder build() {
- return new Builder();
- }
-
- public static class Builder {
-
- private String memoryKey = DEFAULT_MEMORY_KEY;
-
- private Builder() {
-
- }
-
- public Builder memoryKey(final String memoryKey) {
- this.memoryKey = memoryKey;
- return this;
- }
-
- public ClusterPopulationMapReduce create() {
- return new ClusterPopulationMapReduce(this.memoryKey);
- }
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
deleted file mode 100644
index 98fafcc..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
-
-import com.tinkerpop.gremlin.process.Traversal;
-import com.tinkerpop.gremlin.process.computer.Memory;
-import com.tinkerpop.gremlin.process.computer.MessageScope;
-import com.tinkerpop.gremlin.process.computer.Messenger;
-import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
-import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
-import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
-import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
-import com.tinkerpop.gremlin.process.graph.traversal.__;
-import com.tinkerpop.gremlin.process.util.MapHelper;
-import com.tinkerpop.gremlin.structure.Edge;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.VertexProperty;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import com.tinkerpop.gremlin.util.StreamFactory;
-import org.apache.commons.configuration.Configuration;
-import org.javatuples.Pair;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Supplier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializable, Double>> {
-
- private MessageScope.Local<?> voteScope = MessageScope.Local.of(__::outE);
- private MessageScope.Local<?> countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
- private final Set<MessageScope> VOTE_SCOPE = new HashSet<>(Collections.singletonList(this.voteScope));
- private final Set<MessageScope> COUNT_SCOPE = new HashSet<>(Collections.singletonList(this.countScope));
-
- public static final String CLUSTER = "gremlin.peerPressureVertexProgram.cluster";
- public static final String VOTE_STRENGTH = "gremlin.peerPressureVertexProgram.voteStrength";
-
- private static final String MAX_ITERATIONS = "gremlin.peerPressureVertexProgram.maxIterations";
- private static final String DISTRIBUTE_VOTE = "gremlin.peerPressureVertexProgram.distributeVote";
- private static final String INCIDENT_TRAVERSAL_SUPPLIER = "gremlin.peerPressureVertexProgram.incidentTraversalSupplier";
- private static final String VOTE_TO_HALT = "gremlin.peerPressureVertexProgram.voteToHalt";
-
- private LambdaHolder<Supplier<Traversal<Vertex, Edge>>> traversalSupplier;
- private int maxIterations = 30;
- private boolean distributeVote = false;
-
- private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(CLUSTER, VOTE_STRENGTH));
- private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
-
- private PeerPressureVertexProgram() {
-
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.traversalSupplier = LambdaHolder.loadState(configuration, INCIDENT_TRAVERSAL_SUPPLIER);
- if (null != this.traversalSupplier) {
- VertexProgramHelper.verifyReversibility(this.traversalSupplier.get().get().asAdmin());
- this.voteScope = MessageScope.Local.of(this.traversalSupplier.get());
- this.countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
- }
- this.maxIterations = configuration.getInt(MAX_ITERATIONS, 30);
- this.distributeVote = configuration.getBoolean(DISTRIBUTE_VOTE, false);
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- this.traversalSupplier.storeState(configuration);
- configuration.setProperty(MAX_ITERATIONS, this.maxIterations);
- configuration.setProperty(DISTRIBUTE_VOTE, this.distributeVote);
-
- }
-
- @Override
- public Set<String> getElementComputeKeys() {
- return ELEMENT_COMPUTE_KEYS;
- }
-
- @Override
- public Set<String> getMemoryComputeKeys() {
- return MEMORY_COMPUTE_KEYS;
- }
-
- @Override
- public Set<MessageScope> getMessageScopes(final Memory memory) {
- return this.distributeVote && memory.isInitialIteration() ? COUNT_SCOPE : VOTE_SCOPE;
- }
-
- @Override
- public void setup(final Memory memory) {
- memory.set(VOTE_TO_HALT, false);
- }
-
- @Override
- public void execute(final Vertex vertex, Messenger<Pair<Serializable, Double>> messenger, final Memory memory) {
- if (memory.isInitialIteration()) {
- if (this.distributeVote) {
- messenger.sendMessage(this.countScope, Pair.with('c', 1.0d));
- } else {
- double voteStrength = 1.0d;
- vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
- vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
- messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
- memory.and(VOTE_TO_HALT, false);
- }
- } else if (1 == memory.getIteration() && this.distributeVote) {
- double voteStrength = 1.0d / StreamFactory.stream(messenger.receiveMessages(this.countScope)).map(Pair::getValue1).reduce(0.0d, (a, b) -> a + b);
- vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
- vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
- messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
- memory.and(VOTE_TO_HALT, false);
- } else {
- final Map<Serializable, Double> votes = new HashMap<>();
- votes.put(vertex.value(CLUSTER), vertex.<Double>value(VOTE_STRENGTH));
- messenger.receiveMessages(this.voteScope).forEach(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
- Serializable cluster = PeerPressureVertexProgram.largestCount(votes);
- if (null == cluster) cluster = (Serializable) vertex.id();
- memory.and(VOTE_TO_HALT, vertex.value(CLUSTER).equals(cluster));
- vertex.property(VertexProperty.Cardinality.single, CLUSTER, cluster);
- messenger.sendMessage(this.voteScope, new Pair<>(cluster, vertex.<Double>value(VOTE_STRENGTH)));
- }
- }
-
- @Override
- public boolean terminate(final Memory memory) {
- final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT) || memory.getIteration() >= (this.distributeVote ? this.maxIterations + 1 : this.maxIterations);
- if (voteToHalt) {
- return true;
- } else {
- memory.or(VOTE_TO_HALT, true);
- return false;
- }
- }
-
- private static <T> T largestCount(final Map<T, Double> map) {
- T largestKey = null;
- double largestValue = Double.MIN_VALUE;
- for (Map.Entry<T, Double> entry : map.entrySet()) {
- if (entry.getValue() == largestValue) {
- if (null != largestKey && largestKey.toString().compareTo(entry.getKey().toString()) > 0) {
- largestKey = entry.getKey();
- largestValue = entry.getValue();
- }
- } else if (entry.getValue() > largestValue) {
- largestKey = entry.getKey();
- largestValue = entry.getValue();
- }
- }
- return largestKey;
- }
-
- @Override
- public String toString() {
- return StringFactory.vertexProgramString(this, "distributeVote=" + this.distributeVote + ",maxIterations=" + this.maxIterations);
- }
-
- //////////////////////////////
-
- public static Builder build() {
- return new Builder();
- }
-
- public static class Builder extends AbstractVertexProgramBuilder<Builder> {
-
-
- private Builder() {
- super(PeerPressureVertexProgram.class);
- }
-
- public Builder maxIterations(final int iterations) {
- this.configuration.setProperty(MAX_ITERATIONS, iterations);
- return this;
- }
-
- public Builder distributeVote(final boolean distributeVote) {
- this.configuration.setProperty(DISTRIBUTE_VOTE, distributeVote);
- return this;
- }
-
- public Builder incident(final String scriptEngine, final String traversalScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, INCIDENT_TRAVERSAL_SUPPLIER, new String[]{scriptEngine, traversalScript});
- return this;
- }
-
- public Builder incident(final String traversalScript) {
- return incident(GREMLIN_GROOVY, traversalScript);
- }
-
- public Builder incident(final Supplier<Traversal<Vertex, Edge>> traversal) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, INCIDENT_TRAVERSAL_SUPPLIER, traversal);
- return this;
- }
-
- public Builder incident(final Class<Supplier<Traversal<Vertex, Edge>>> traversalClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, INCIDENT_TRAVERSAL_SUPPLIER, traversalClass);
- return this;
- }
-
- }
-
- ////////////////////////////
-
- @Override
- public Features getFeatures() {
- return new Features() {
- @Override
- public boolean requiresLocalMessageScopes() {
- return true;
- }
-
- @Override
- public boolean requiresVertexPropertyAddition() {
- return true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
deleted file mode 100644
index 13caa2f..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.lambda;
-
-import com.tinkerpop.gremlin.process.computer.KeyValue;
-import com.tinkerpop.gremlin.process.computer.MapReduce;
-import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
-import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
-import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import com.tinkerpop.gremlin.util.function.TriConsumer;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class LambdaMapReduce<MK, MV, RK, RV, R> extends StaticMapReduce<MK, MV, RK, RV, R> {
-
- public static final String MAP_LAMBDA = "gremlin.lambdaMapReduce.mapLambda";
- public static final String MAP_KEY_SORT = "gremlin.lambdaMapReduce.mapKeySort";
- public static final String COMBINE_LAMBDA = "gremlin.lambdaMapReduce.combineLambda";
- public static final String REDUCE_LAMBDA = "gremlin.lambdaMapReduce.reduceLambda";
- public static final String REDUCE_KEY_SORT = "gremlin.lambdaMapReduce.reduceKeySort";
- public static final String MEMORY_LAMBDA = "gremlin.lambdaMapReduce.memoryLambda";
- public static final String MEMORY_KEY = "gremlin.lambdaMapReduce.memoryKey";
-
- private LambdaHolder<BiConsumer<Vertex, MapEmitter<MK, MV>>> mapLambdaHolder;
- private LambdaHolder<Supplier<Comparator<MK>>> mapKeySortLambdaHolder;
- private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> combineLambdaHolder;
- private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> reduceLambdaHolder;
- private LambdaHolder<Supplier<Comparator<RK>>> reduceKeySortLambdaHolder;
- private LambdaHolder<Function<Iterator<KeyValue<RK, RV>>, R>> memoryLambdaHolder;
- private String memoryKey;
-
- private LambdaMapReduce() {
-
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.mapLambdaHolder = LambdaHolder.loadState(configuration, MAP_LAMBDA);
- this.mapKeySortLambdaHolder = LambdaHolder.loadState(configuration, MAP_KEY_SORT);
- this.combineLambdaHolder = LambdaHolder.loadState(configuration, COMBINE_LAMBDA);
- this.reduceLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_LAMBDA);
- this.reduceKeySortLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_KEY_SORT);
- this.memoryLambdaHolder = LambdaHolder.loadState(configuration, MEMORY_LAMBDA);
- this.memoryKey = configuration.getString(MEMORY_KEY, null);
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- if (null != this.mapLambdaHolder)
- this.mapLambdaHolder.storeState(configuration);
- if (null != this.mapKeySortLambdaHolder)
- this.mapKeySortLambdaHolder.storeState(configuration);
- if (null != this.combineLambdaHolder)
- this.combineLambdaHolder.storeState(configuration);
- if (null != this.reduceLambdaHolder)
- this.reduceLambdaHolder.storeState(configuration);
- if (null != this.reduceKeySortLambdaHolder)
- this.reduceKeySortLambdaHolder.storeState(configuration);
- if (null != this.memoryLambdaHolder)
- this.memoryLambdaHolder.storeState(configuration);
- configuration.setProperty(MEMORY_KEY, this.memoryKey);
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- if (stage.equals(Stage.MAP))
- return null != this.mapLambdaHolder;
- else if (stage.equals(Stage.COMBINE))
- return null != this.combineLambdaHolder;
- else
- return null != this.reduceLambdaHolder;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
- this.mapLambdaHolder.get().accept(vertex, emitter);
- }
-
- @Override
- public void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
- this.combineLambdaHolder.get().accept(key, values, emitter);
- }
-
- @Override
- public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
- this.reduceLambdaHolder.get().accept(key, values, emitter);
- }
-
- @Override
- public Optional<Comparator<MK>> getMapKeySort() {
- return null == this.mapKeySortLambdaHolder ? Optional.empty() : Optional.of(this.mapKeySortLambdaHolder.get().get());
- }
-
- @Override
- public Optional<Comparator<RK>> getReduceKeySort() {
- return null == this.reduceKeySortLambdaHolder ? Optional.empty() : Optional.of(this.reduceKeySortLambdaHolder.get().get());
- }
-
- @Override
- public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues) {
- return null == this.memoryLambdaHolder ? (R) keyValues : this.memoryLambdaHolder.get().apply(keyValues);
- }
-
- @Override
- public String getMemoryKey() {
- return this.memoryKey;
- }
-
- @Override
- public String toString() {
- return StringFactory.mapReduceString(this, this.memoryKey);
- }
-
- //////////////////
-
- public static <MK, MV, RK, RV, R> Builder<MK, MV, RK, RV, R> build() {
- return new Builder<>();
- }
-
- public static class Builder<MK, MV, RK, RV, R> {
-
- private final Configuration configuration = new BaseConfiguration();
-
- public Builder<MK, MV, RK, RV, R> map(final BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>> mapLambda) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_LAMBDA, mapLambda);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> map(final Class<? extends BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>>> mapClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, mapClass);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> map(final String scriptEngine, final String mapScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, new String[]{scriptEngine, mapScript});
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> map(final String setupScript) {
- return map(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
- }
-
- //
-
- public Builder<MK, MV, RK, RV, R> mapKeySort(final Comparator<MK> comparator) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_KEY_SORT, comparator);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> mapKeySort(final Class<? extends Comparator<MK>> comparatorClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MAP_KEY_SORT, comparatorClass);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> mapKeySort(final String scriptEngine, final String reduceScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_KEY_SORT, new String[]{scriptEngine, reduceScript});
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> mapKeySort(final String setupScript) {
- return mapKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
- }
-
- ////////////
-
- public Builder<MK, MV, RK, RV, R> combine(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> combineLambda) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, COMBINE_LAMBDA, combineLambda);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> combine(final Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> combineClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, COMBINE_LAMBDA, combineClass);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> combine(final String scriptEngine, final String combineScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, COMBINE_LAMBDA, new String[]{scriptEngine, combineScript});
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> combine(final String setupScript) {
- return combine(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
- }
-
- ////////////
-
- public Builder<MK, MV, RK, RV, R> reduce(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> reduceLambda) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_LAMBDA, reduceLambda);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> reduce(Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> reduceClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_LAMBDA, reduceClass);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> reduce(final String scriptEngine, final String reduceScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_LAMBDA, new String[]{scriptEngine, reduceScript});
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> reduce(final String setupScript) {
- return reduce(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
- }
-
- //
-
- public Builder<MK, MV, RK, RV, R> reduceKeySort(final Supplier<Comparator<RK>> comparator) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_KEY_SORT, comparator);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> reduceKeySort(final Class<? extends Supplier<Comparator<RK>>> comparatorClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_KEY_SORT, comparatorClass);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> reduceKeySort(final String scriptEngine, final String reduceScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_KEY_SORT, new String[]{scriptEngine, reduceScript});
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> reduceKeySort(final String setupScript) {
- return reduceKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
- }
-
- ////////////
-
- public Builder<MK, MV, RK, RV, R> memory(Function<Iterator<KeyValue<RK, RV>>, R> memoryLambda) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MEMORY_LAMBDA, memoryLambda);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> memory(Class<? extends Function<Iterator<KeyValue<RK, RV>>, R>> memoryClass) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MEMORY_LAMBDA, memoryClass);
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> memory(final String scriptEngine, final String memoryScript) {
- LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MEMORY_LAMBDA, new String[]{scriptEngine, memoryScript});
- return this;
- }
-
- public Builder<MK, MV, RK, RV, R> memory(final String setupScript) {
- return memory(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
- }
-
- ////////////
-
- public Builder<MK, MV, RK, RV, R> memoryKey(final String memoryKey) {
- this.configuration.setProperty(LambdaMapReduce.MEMORY_KEY, memoryKey);
- return this;
- }
-
- public LambdaMapReduce<MK, MV, RK, RV, R> create() {
- LambdaMapReduce<MK, MV, RK, RV, R> lambdaMapReduce = new LambdaMapReduce<>();
- lambdaMapReduce.loadState(this.configuration);
- return lambdaMapReduce;
- }
- }
-}