You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:50 UTC
[24/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop
directories to org/apache/tinkerpop
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
new file mode 100644
index 0000000..870c155
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.process.computer.ComputerResult;
+import com.tinkerpop.gremlin.process.computer.GraphComputer;
+import com.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import com.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
+import com.tinkerpop.gremlin.process.graph.traversal.GraphTraversal;
+import com.tinkerpop.gremlin.process.traversal.DefaultTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.EmptyStep;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.BulkSet;
+import com.tinkerpop.gremlin.structure.Graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Traversal} represents a directed walk over a {@link Graph}.
+ * This is the base interface for all traversal's, where each extending interface is seen as a domain specific language.
+ * For example, {@link GraphTraversal} is a domain specific language for traversing a graph using "graph concepts" (e.g. vertices, edges).
+ * Another example may represent the graph using "social concepts" (e.g. people, cities, artifacts).
+ * A {@link Traversal} is evaluated in one of two ways: {@link TraversalEngine#STANDARD} (OLTP) and {@link TraversalEngine#COMPUTER} (OLAP).
+ * OLTP traversals leverage an iterator and are executed within a single JVM (with data access allowed to be remote).
+ * OLAP traversals leverage {@link GraphComputer} and are executed between multiple JVMs (and/or cores).
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
+
+ /**
+ * Used for reflection based access to the static "of" method of a Traversal.
+ */
+ public static final String OF = "of";
+
+ /**
+ * Get access to administrative methods of the traversal via its accompanying {@link Traversal.Admin}.
+ *
+ * @return the admin of this traversal
+ */
+ public default Traversal.Admin<S, E> asAdmin() {
+ return (Traversal.Admin<S, E>) this;
+ }
+
+ /**
+ * Submit the traversal to a {@link GraphComputer} for OLAP execution.
+ * This method should execute the traversal via {@link TraversalVertexProgram}.
+ * It should then wrap the {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} in a new {@link Traversal} containing a {@link ComputerResultStep}.
+ *
+ * @param computer the GraphComputer to execute the traversal on
+ * @return a new traversal with the starts being the results of the TraversalVertexProgram
+ */
+ public default Traversal<S, E> submit(final GraphComputer computer) {
+ try {
+ final TraversalVertexProgram vertexProgram = TraversalVertexProgram.build().traversal(this.asAdmin()).create();
+ final ComputerResult result = computer.program(vertexProgram).submit().get();
+ final Traversal.Admin<S, S> traversal = new DefaultTraversal<>(result.graph().getClass());
+ return traversal.asAdmin().addStep(new ComputerResultStep<>(traversal, result, vertexProgram, true));
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Return an {@link Optional} of the next E object in the traversal.
+ * If the traversal is empty, then an {@link Optional#empty()} is returned.
+ *
+ * @return an optional of the next object in the traversal
+ */
+ public default Optional<E> tryNext() {
+ return this.hasNext() ? Optional.of(this.next()) : Optional.empty();
+ }
+
+ /**
+ * Get the next n-number of results from the traversal.
+ * If the traversal has less than n-results, then only that number of results are returned.
+ *
+ * @param amount the number of results to get
+ * @return the n-results in a {@link List}
+ */
+ public default List<E> next(final int amount) {
+ final List<E> result = new ArrayList<>();
+ int counter = 0;
+ while (counter++ < amount && this.hasNext()) {
+ result.add(this.next());
+ }
+ return result;
+ }
+
+ /**
+ * Put all the results into an {@link ArrayList}.
+ *
+ * @return the results in a list
+ */
+ public default List<E> toList() {
+ return this.fill(new ArrayList<>());
+ }
+
+ /**
+ * Put all the results into a {@link HashSet}.
+ *
+ * @return the results in a set
+ */
+ public default Set<E> toSet() {
+ return this.fill(new HashSet<>());
+ }
+
+ /**
+ * Put all the results into a {@link BulkSet}.
+ * This can reduce both time and space when aggregating results by ensuring a weighted set.
+ *
+ * @return the results in a bulk set
+ */
+ public default BulkSet<E> toBulkSet() {
+ return this.fill(new BulkSet<>());
+ }
+
+ /**
+ * Add all the results of the traversal to the provided collection.
+ *
+ * @param collection the collection to fill
+ * @return the collection now filled
+ */
+ public default <C extends Collection<E>> C fill(final C collection) {
+ try {
+ if (!this.asAdmin().getEngine().isPresent())
+ this.asAdmin().applyStrategies(TraversalEngine.STANDARD);
+ // use the end step so the results are bulked
+ final Step<?, E> endStep = this.asAdmin().getEndStep();
+ while (true) {
+ final Traverser<E> traverser = endStep.next();
+ TraversalHelper.addToCollection(collection, traverser.get(), traverser.bulk());
+ }
+ } catch (final NoSuchElementException ignored) {
+ }
+ return collection;
+ }
+
+ /**
+ * Iterate all the {@link Traverser} instances in the traversal.
+ * What is returned is the empty traversal.
+ * It is assumed that what is desired from the computation is are the sideEffects yielded by the traversal.
+ *
+ * @return the fully drained traversal
+ */
+ public default <A, B> Traversal<A, B> iterate() {
+ try {
+ if (!this.asAdmin().getEngine().isPresent())
+ this.asAdmin().applyStrategies(TraversalEngine.STANDARD);
+ // use the end step so the results are bulked
+ final Step<?, E> endStep = this.asAdmin().getEndStep();
+ while (true) {
+ endStep.next();
+ }
+ } catch (final NoSuchElementException ignored) {
+ }
+ return (Traversal<A, B>) this;
+ }
+
+ /**
+ * A traversal can be rewritten such that its defined end type E may yield objects of a different type.
+ * This helper method allows for the casting of the output to the known the type.
+ *
+ * @param endType the true output type of the traversal
+ * @param consumer a {@link Consumer} to process each output
+ * @param <E2> the known output type of the traversal
+ */
+ public default <E2> void forEachRemaining(final Class<E2> endType, final Consumer<E2> consumer) {
+ try {
+ while (true) {
+ consumer.accept((E2) next());
+ }
+ } catch (final NoSuchElementException ignore) {
+
+ }
+ }
+
+ /**
+ * A collection of {@link Exception} types associated with Traversal execution.
+ */
+ public static class Exceptions {
+
+ public static IllegalStateException traversalIsLocked() {
+ return new IllegalStateException("The traversal strategies are complete and the traversal can no longer be modulated");
+ }
+
+ public static IllegalStateException traversalIsNotReversible() {
+ return new IllegalStateException("The traversal is not reversible as it contains steps that are not reversible");
+ }
+ }
+
+ public interface Admin<S, E> extends Traversal<S, E> {
+
+ /**
+ * Add an iterator of {@link Traverser} objects to the head/start of the traversal.
+ * Users should typically not need to call this method. For dynamic inject of data, they should use {@link com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.InjectStep}.
+ *
+ * @param starts an iterators of traversers
+ */
+ public default void addStarts(final Iterator<Traverser<S>> starts) {
+ if(!this.getEngine().isPresent()) this.applyStrategies(TraversalEngine.STANDARD);
+ this.getStartStep().addStarts(starts);
+ }
+
+ /**
+ * Add a single {@link Traverser} object to the head of the traversal.
+ * Users should typically not need to call this method. For dynamic inject of data, they should use {@link com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.InjectStep}.
+ *
+ * @param start a traverser to add to the traversal
+ */
+ public default void addStart(final Traverser<S> start) {
+ if(!this.getEngine().isPresent()) this.applyStrategies(TraversalEngine.STANDARD);
+ this.getStartStep().addStart(start);
+ }
+
+ /**
+ * Get the {@link Step} instances associated with this traversal.
+ * The steps are ordered according to their linked list structure as defined by {@link Step#getPreviousStep()} and {@link Step#getNextStep()}.
+ *
+ * @return the ordered steps of the traversal
+ */
+ public List<Step> getSteps();
+
+ /**
+ * Add a {@link Step} to the end of the traversal. This method should link the step to its next and previous step accordingly.
+ *
+ * @param step the step to add
+ * @param <E2> the output of the step
+ * @return the updated traversal
+ * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+ */
+ public default <E2> Traversal.Admin<S, E2> addStep(final Step<?, E2> step) throws IllegalStateException {
+ return this.addStep(this.getSteps().size(), step);
+ }
+
+ /**
+ * Add a {@link Step} to an arbitrary point in the traversal.
+ *
+ * @param index the location in the traversal to insert the step
+ * @param step the step to add
+ * @param <S2> the new start type of the traversal (if the added step was a start step)
+ * @param <E2> the new end type of the traversal (if the added step was an end step)
+ * @return the newly modulated traversal
+ * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+ */
+ public <S2, E2> Traversal.Admin<S2, E2> addStep(final int index, final Step<?, ?> step) throws IllegalStateException;
+
+ /**
+ * Remove a {@link Step} from the traversal.
+ *
+ * @param step the step to remove
+ * @param <S2> the new start type of the traversal (if the removed step was a start step)
+ * @param <E2> the new end type of the traversal (if the removed step was an end step)
+ * @return the newly modulated traversal
+ * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+ */
+ public default <S2, E2> Traversal.Admin<S2, E2> removeStep(final Step<?, ?> step) throws IllegalStateException {
+ return this.removeStep(this.getSteps().indexOf(step));
+ }
+
+ /**
+ * Remove a {@link Step} from the traversal.
+ *
+ * @param index the location in the traversal of the step to be evicted
+ * @param <S2> the new start type of the traversal (if the removed step was a start step)
+ * @param <E2> the new end type of the traversal (if the removed step was an end step)
+ * @return the newly modulated traversal
+ * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+ */
+ public <S2, E2> Traversal.Admin<S2, E2> removeStep(final int index) throws IllegalStateException;
+
+ /**
+ * Get the start/head of the traversal. If the traversal is empty, then an {@link EmptyStep} instance is returned.
+ *
+ * @return the start step of the traversal
+ */
+ public default Step<S, ?> getStartStep() {
+ final List<Step> steps = this.getSteps();
+ return steps.isEmpty() ? EmptyStep.instance() : steps.get(0);
+ }
+
+ /**
+ * Get the end/tail of the traversal. If the traversal is empty, then an {@link EmptyStep} instance is returned.
+ *
+ * @return the end step of the traversal
+ */
+ public default Step<?, E> getEndStep() {
+ final List<Step> steps = this.getSteps();
+ return steps.isEmpty() ? EmptyStep.instance() : steps.get(steps.size() - 1);
+ }
+
+ /**
+ * Apply the registered {@link TraversalStrategies} to the traversal.
+ * Once the strategies are applied, the traversal is "locked" and can no longer have steps added to it.
+ * The order of operations for strategy applications should be: globally id steps, apply strategies to root traversal, then to nested traversals.
+ *
+ * @param engine the engine that will ultimately execute the traversal.
+ * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+ */
+ public void applyStrategies(final TraversalEngine engine) throws IllegalStateException;
+
+ /**
+ * When the {@link TraversalStrategies} have been applied, the destined {@link TraversalEngine} has been declared.
+ * Once a traversal engine has been declared, the traversal can no longer be extended, only executed.
+ *
+ * @return whether the traversal engine has been defined or not.
+ */
+ public Optional<TraversalEngine> getEngine();
+
+ /**
+ * Get the {@link TraverserGenerator} associated with this traversal.
+ * The traversal generator creates {@link Traverser} instances that are respective of the traversal's {@link com.tinkerpop.gremlin.process.traverser.TraverserRequirement}.
+ *
+ * @return the generator of traversers
+ */
+ public default TraverserGenerator getTraverserGenerator() {
+ return this.getStrategies().getTraverserGeneratorFactory().getTraverserGenerator(this);
+ }
+
+ /**
+ * Get the set of all {@link TraverserRequirement}s for this traversal.
+ *
+ * @return the features of a traverser that are required to execute properly in this traversal
+ */
+ public default Set<TraverserRequirement> getTraverserRequirements() {
+ final Set<TraverserRequirement> requirements = this.getSteps().stream()
+ .flatMap(step -> ((Step<?, ?>) step).getRequirements().stream())
+ .collect(Collectors.toSet());
+ if (this.getSideEffects().keys().size() > 0)
+ requirements.add(TraverserRequirement.SIDE_EFFECTS);
+ if (this.getSideEffects().getSackInitialValue().isPresent())
+ requirements.add(TraverserRequirement.SACK);
+ if (this.getEngine().isPresent() && this.getEngine().get().equals(TraversalEngine.COMPUTER))
+ requirements.add(TraverserRequirement.BULK);
+ return requirements;
+ }
+
+ /**
+ * Call the {@link Step#reset} method on every step in the traversal.
+ */
+ public default void reset() {
+ this.getSteps().forEach(Step::reset);
+ }
+
+ /**
+ * Assume the every {@link Step} implements {@link Reversible} and call {@link Reversible#reverse()} for each.
+ *
+ * @return the traversal with its steps reversed
+ */
+ public default Traversal.Admin<S, E> reverse() throws IllegalStateException {
+ if (!TraversalHelper.isReversible(this)) throw Exceptions.traversalIsNotReversible();
+ this.getSteps().stream().forEach(step -> ((Reversible) step).reverse());
+ return this;
+ }
+
+ /**
+ * Set the {@link TraversalSideEffects} of this traversal.
+ *
+ * @param sideEffects the sideEffects to set for this traversal.
+ */
+ public void setSideEffects(final TraversalSideEffects sideEffects);
+
+ /**
+ * Get the {@link TraversalSideEffects} associated with the traversal.
+ *
+ * @return The traversal sideEffects
+ */
+ public TraversalSideEffects getSideEffects();
+
+ /**
+ * Set the {@link TraversalStrategies} to be used by this traversal at evaluation time.
+ *
+ * @param strategies the strategies to use on this traversal
+ */
+ public void setStrategies(final TraversalStrategies strategies);
+
+ /**
+ * Get the {@link TraversalStrategies} associated with this traversal.
+ *
+ * @return the strategies associated with this traversal
+ */
+ public TraversalStrategies getStrategies();
+
+ /**
+ * Set the {@link com.tinkerpop.gremlin.process.traversal.step.TraversalParent} {@link Step} that is the parent of this traversal.
+ * Traversals can be nested and this is the means by which the traversal tree is connected.
+ *
+ * @param step the traversal holder parent step
+ */
+ public void setParent(final TraversalParent step);
+
+ /**
+ * Get the {@link com.tinkerpop.gremlin.process.traversal.step.TraversalParent} {@link Step} that is the parent of this traversal.
+ * Traversals can be nested and this is the means by which the traversal tree is walked.
+ *
+ * @return the traversal holder parent step
+ */
+ public TraversalParent getParent();
+
+ /**
+ * Cloning is used to duplicate the traversal typically in OLAP environments.
+ *
+ * @return The cloned traversal
+ */
+ public Traversal.Admin<S, E> clone() throws CloneNotSupportedException;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java
new file mode 100644
index 0000000..ad14db6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum TraversalEngine {
+
+ STANDARD, COMPUTER
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java
new file mode 100644
index 0000000..f44e287
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.structure.Vertex;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface TraversalSideEffects extends Cloneable, Serializable {
+
+ public static final String SIDE_EFFECTS = "gremlin.traversal.sideEffects";
+
+ /**
+ * Determines if the {@link TraversalSideEffects} contains the respective key.
+ * If the key references a stored {@link java.util.function.Supplier}, then it should return true as it will be dynamically created on get().
+ *
+ * @param key the key to check for
+ * @return whether the key exists in the sideEffects
+ */
+ public default boolean exists(final String key) {
+ return this.keys().contains(key);
+ }
+
+ /**
+ * Set the specified key to the specified value.
+ * If a {@link java.util.function.Supplier} is provided, it is NOT assumed to be a supplier as set by registerSupplier().
+ *
+ * @param key the key
+ * @param value the value
+ */
+ public void set(final String key, final Object value);
+
+ /**
+ * Get the sideEffect associated with the provided key.
+ * If the sideEffect contains an object for the key, return it.
+ * Else if the sideEffect has a registered {@link java.util.function.Supplier} for that key, generate the object, store the object in the sideEffects, and return it.
+ *
+ * @param key the key to get the value for
+ * @param <V> the type of the value to retrieve
+ * @return the value associated with key
+ * @throws IllegalArgumentException if the key does not reference an object or a registered supplier.
+ */
+ public <V> V get(final String key) throws IllegalArgumentException;
+
+ /**
+ * Return the value associated with the key or return the provided otherValue.
+ * The otherValue will not be stored in the sideEffect.
+ *
+ * @param key the key to get the value for
+ * @param otherValue if not value is associated with key, return the other value.
+ * @param <V> the type of the value to get
+ * @return the value associated with the key or the otherValue
+ */
+ public default <V> V orElse(final String key, final V otherValue) {
+ return this.exists(key) ? this.get(key) : otherValue;
+ }
+
+ /**
+ * If a value or registered {@link java.util.function.Supplier} exists for the provided key, consume it with the provided consumer.
+ *
+ * @param key the key to the value
+ * @param consumer the consumer to process the value
+ * @param <V> the type of the value to consume
+ */
+ public default <V> void ifPresent(final String key, final Consumer<V> consumer) {
+ if (this.exists(key)) consumer.accept(this.get(key));
+ }
+
+ /**
+ * Remove both the value and registered {@link java.util.function.Supplier} associated with provided key.
+ *
+ * @param key the key of the value and registered supplier to remove
+ */
+ public void remove(final String key);
+
+ /**
+ * The keys of the sideEffect which includes registered {@link java.util.function.Supplier} keys.
+ * In essence, that which is possible to get().
+ *
+ * @return the keys of the sideEffect
+ */
+ public Set<String> keys();
+
+ ////////////
+
+ /**
+ * Register a {@link java.util.function.Supplier} with the provided key.
+ * When sideEffects get() are called, if no object exists and there exists a registered supplier for the key, the object is generated.
+ * Registered suppliers are used for the lazy generation of sideEffect data.
+ *
+ * @param key the key to register the supplier with
+ * @param supplier the supplier that will generate an object when get() is called if it hasn't already been created
+ */
+ public void registerSupplier(final String key, final Supplier supplier);
+
+ /**
+ * Get the registered {@link java.util.function.Supplier} associated with the specified key.
+ *
+ * @param key the key associated with the supplier
+ * @param <V> The object type of the supplier
+ * @return A non-empty optional if the supplier exists
+ */
+ public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key);
+
+ /**
+ * A helper method to register a {@link java.util.function.Supplier} if it has not already been registered.
+ *
+ * @param key the key of the supplier to register
+ * @param supplier the supplier to register if the key has not already been registered
+ */
+ public default void registerSupplierIfAbsent(final String key, final Supplier supplier) {
+ if (!this.getRegisteredSupplier(key).isPresent())
+ this.registerSupplier(key, supplier);
+ }
+
+ public <S> void setSack(final Supplier<S> initialValue, final Optional<UnaryOperator<S>> splitOperator);
+
+ public <S> Optional<Supplier<S>> getSackInitialValue();
+
+ public <S> Optional<UnaryOperator<S>> getSackSplitOperator();
+
+ /**
+ * If the sideEffect contains an object associated with the key, return it.
+ * Else if a "with" supplier exists for the key, generate the object, store it in the sideEffects and return the object.
+ * Else use the provided supplier to generate the object, store it in the sideEffects and return the object.
+ * Note that if the orCreate supplier is used, it is NOT registered as a {@link java.util.function.Supplier}.
+ *
+ * @param key the key of the object to get
+ * @param orCreate if the object doesn't exist as an object or suppliable object, then generate it with the specified supplier
+ * @param <V> the return type of the object
+ * @return the object that is either retrieved, generated, or supplier via orCreate
+ */
+ public default <V> V getOrCreate(final String key, final Supplier<V> orCreate) {
+ if (this.exists(key))
+ return this.<V>get(key);
+ final Optional<Supplier<V>> with = this.getRegisteredSupplier(key);
+ if (with.isPresent()) {
+ final V v = with.get().get();
+ this.set(key, v);
+ return v;
+ } else {
+ final V v = orCreate.get();
+ this.set(key, v);
+ return v;
+ }
+ }
+
+ ////////////
+
+ public default <V> void forEach(final BiConsumer<String, V> biConsumer) {
+ this.keys().forEach(key -> biConsumer.accept(key, this.get(key)));
+ }
+
+ /**
+ * In a distributed {@link com.tinkerpop.gremlin.process.computer.GraphComputer} traversal, the sideEffects of the traversal are not a single object within a single JVM.
+ * Instead, the sideEffects are distributed across the graph and the pieces are stored on the computing vertices.
+ * This method is necessary to call when the {@link com.tinkerpop.gremlin.process.Traversal} is processing the {@link com.tinkerpop.gremlin.process.Traverser}s at a particular {@link com.tinkerpop.gremlin.structure.Vertex}.
+ *
+ * @param vertex the vertex where the traversal is currently executing.
+ */
+ public void setLocalVertex(final Vertex vertex);
+
+ /**
+ * Cloning is used to duplicate the sideEffects typically in OLAP environments.
+ *
+ * @return The cloned sideEffects
+ */
+ public TraversalSideEffects clone() throws CloneNotSupportedException;
+
+ /**
+ * Add the current {@link TraversalSideEffects} data and suppliers to the provided {@link TraversalSideEffects}.
+ *
+ * @param sideEffects the sideEffects to add this traversal's sideEffect data to.
+ */
+ public void mergeInto(final TraversalSideEffects sideEffects);
+
+ public static class Exceptions {
+
+ public static IllegalArgumentException sideEffectKeyCanNotBeEmpty() {
+ return new IllegalArgumentException("Side effect key can not be the empty string");
+ }
+
+ public static IllegalArgumentException sideEffectKeyCanNotBeNull() {
+ return new IllegalArgumentException("Side effect key can not be null");
+ }
+
+ public static IllegalArgumentException sideEffectValueCanNotBeNull() {
+ return new IllegalArgumentException("Side effect value can not be null");
+ }
+
+ public static IllegalArgumentException sideEffectDoesNotExist(final String key) {
+ return new IllegalArgumentException("Side effects do not have a value for provided key: " + key);
+ }
+
+ public static UnsupportedOperationException dataTypeOfSideEffectValueNotSupported(final Object val) {
+ return new UnsupportedOperationException(String.format("Side effect value [%s] is of type %s is not supported", val, val.getClass()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java
new file mode 100644
index 0000000..c5cf221
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.process.graph.traversal.__;
+import com.tinkerpop.gremlin.process.graph.traversal.strategy.*;
+import com.tinkerpop.gremlin.process.traversal.DefaultTraversalStrategies;
+import com.tinkerpop.gremlin.process.traverser.TraverserGeneratorFactory;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.VertexProperty;
+import com.tinkerpop.gremlin.util.tools.MultiMap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface TraversalStrategies extends Serializable, Cloneable {
+
+ /**
+ * Return all the {@link TraversalStrategy} singleton instances associated with this {@link TraversalStrategies}.
+ */
+ public List<TraversalStrategy> toList();
+
+ /**
+ * Apply all the {@link TraversalStrategy} optimizers to the {@link Traversal} for the stated {@link TraversalEngine}.
+ * This method must ensure that the strategies are sorted prior to application.
+ *
+ * @param traversal the traversal to apply the strategies to
+ * @param engine the engine that the traversal is going to be executed on
+ */
+ public void applyStrategies(final Traversal.Admin<?, ?> traversal, final TraversalEngine engine);
+
+ /**
+ * Add all the provided {@link TraversalStrategy} instances to the current collection.
+ * When all the provided strategies have been added, the collection is resorted.
+ *
+ * @param strategies the traversal strategies to add
+ * @return the newly updated/sorted traversal strategies collection
+ */
+ public TraversalStrategies addStrategies(final TraversalStrategy... strategies);
+
+ /**
+ * Remove all the provided {@link TraversalStrategy} classes from the current collection.
+ * When all the provided strategies have been removed, the collection is resorted.
+ *
+ * @param strategyClasses the traversal strategies to remove by their class
+ * @return the newly updated/sorted traversal strategies collection
+ */
+ @SuppressWarnings("unchecked")
+ public TraversalStrategies removeStrategies(final Class<? extends TraversalStrategy>... strategyClasses);
+
+ /**
+ * {@inheritDoc}
+ */
+ public TraversalStrategies clone() throws CloneNotSupportedException;
+
+ /**
+ * Get the {@link TraverserGeneratorFactory} to use to generate traversers.
+ */
+ public TraverserGeneratorFactory getTraverserGeneratorFactory();
+
+ /**
+ * Set the {@link TraverserGeneratorFactory} to use for determining which {@link Traverser} type to generate for the {@link Traversal}.
+ *
+ * @param traverserGeneratorFactory the factory to use
+ */
+ public void setTraverserGeneratorFactory(final TraverserGeneratorFactory traverserGeneratorFactory);
+
+ /**
+ * Sorts the list of provided strategies such that the {@link com.tinkerpop.gremlin.process.TraversalStrategy#applyPost()}
+ * and {@link TraversalStrategy#applyPrior()} dependencies are respected.
+ * <p/>
+ * Note, that the order may not be unique.
+ *
+ * @param strategies the traversal strategies to sort
+ */
+ public static void sortStrategies(final List<? extends TraversalStrategy> strategies) {
+ final Map<Class<? extends TraversalStrategy>, Set<Class<? extends TraversalStrategy>>> dependencyMap = new HashMap<>();
+ final Set<Class<? extends TraversalStrategy>> strategyClass = new HashSet<>(strategies.size());
+ //Initialize data structure
+ strategies.forEach(s -> strategyClass.add(s.getClass()));
+
+ //Initialize all the dependencies
+ strategies.forEach(strategy -> {
+ strategy.applyPrior().forEach(s -> {
+ if (strategyClass.contains(s)) MultiMap.put(dependencyMap, s, strategy.getClass());
+ });
+ strategy.applyPost().forEach(s -> {
+ if (strategyClass.contains(s)) MultiMap.put(dependencyMap, strategy.getClass(), s);
+ });
+ });
+ //Now, compute transitive closure until convergence
+ boolean updated;
+ do {
+ updated = false;
+ for (final Class<? extends TraversalStrategy> sc : strategyClass) {
+ List<Class<? extends TraversalStrategy>> toAdd = null;
+ for (Class<? extends TraversalStrategy> before : MultiMap.get(dependencyMap, sc)) {
+ final Set<Class<? extends TraversalStrategy>> beforeDep = MultiMap.get(dependencyMap, before);
+ if (!beforeDep.isEmpty()) {
+ if (toAdd == null) toAdd = new ArrayList<>(beforeDep.size());
+ toAdd.addAll(beforeDep);
+ }
+ }
+ if (toAdd != null && MultiMap.putAll(dependencyMap, sc, toAdd)) updated = true;
+ }
+ } while (updated);
+ Collections.sort(strategies, new Comparator<TraversalStrategy>() {
+ @Override
+ public int compare(final TraversalStrategy s1, final TraversalStrategy s2) {
+ boolean s1Before = MultiMap.containsEntry(dependencyMap, s1.getClass(), s2.getClass());
+ boolean s2Before = MultiMap.containsEntry(dependencyMap, s2.getClass(), s1.getClass());
+ if (s1Before && s2Before)
+ throw new IllegalStateException("Cyclic dependency between traversal strategies: ["
+ + s1.getClass().getName() + ", " + s2.getClass().getName() + ']');
+ if (s1Before) return -1;
+ else if (s2Before) return 1;
+ else return 0;
+ }
+ });
+ }
+
+ public static final class GlobalCache {
+
+ private static final Map<Class, TraversalStrategies> CACHE = new HashMap<>();
+
+ static {
+ final TraversalStrategies coreStrategies = new DefaultTraversalStrategies();
+ coreStrategies.addStrategies(
+ DedupOptimizerStrategy.instance(),
+ RangeByIsCountStrategy.instance(),
+ IdentityRemovalStrategy.instance(),
+ SideEffectCapStrategy.instance(),
+ MatchWhereStrategy.instance(),
+ ComparatorHolderRemovalStrategy.instance(),
+ ReducingStrategy.instance(),
+ LabeledEndStepStrategy.instance(),
+ EngineDependentStrategy.instance(),
+ ProfileStrategy.instance(),
+ SideEffectRegistrationStrategy.instance(),
+ TraversalVerificationStrategy.instance(),
+ ConjunctionStrategy.instance());
+ try {
+ CACHE.put(Graph.class, coreStrategies.clone());
+ CACHE.put(Vertex.class, coreStrategies.clone());
+ CACHE.put(Edge.class, coreStrategies.clone());
+ CACHE.put(VertexProperty.class, coreStrategies.clone());
+ CACHE.put(__.class, new DefaultTraversalStrategies());
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public static void registerStrategies(final Class emanatingClass, final TraversalStrategies traversalStrategies) {
+ CACHE.put(emanatingClass, traversalStrategies);
+ }
+
+ public static TraversalStrategies getStrategies(final Class emanatingClass) {
+ final TraversalStrategies traversalStrategies = CACHE.get(emanatingClass);
+ if (null == traversalStrategies) {
+ if (__.class.isAssignableFrom(emanatingClass))
+ return CACHE.get(__.class);
+ else if (Graph.class.isAssignableFrom(emanatingClass))
+ return CACHE.get(Graph.class);
+ else if (Vertex.class.isAssignableFrom(emanatingClass))
+ return CACHE.get(Vertex.class);
+ else if (Edge.class.isAssignableFrom(emanatingClass))
+ return CACHE.get(Edge.class);
+ else if (VertexProperty.class.isAssignableFrom(emanatingClass))
+ return CACHE.get(VertexProperty.class);
+ else
+ return new DefaultTraversalStrategies();
+ // throw new IllegalStateException("The provided class has no registered traversal strategies: " + emanatingClass);
+ }
+ return traversalStrategies;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java
new file mode 100644
index 0000000..1ed0245
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A {@link TraversalStrategy} defines a particular atomic operation for mutating a {@link Traversal} prior to its evaluation.
+ * Traversal strategies are typically used for optimizing a traversal for the particular underlying graph engine.
+ * Traversal strategies implement {@link Comparable} and thus are sorted to determine their evaluation order.
+ * A TraversalStrategy should not have a public constructor as they should not maintain state between applications.
+ * Make use of a singleton instance() object to reduce object creation on the JVM.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface TraversalStrategy extends Serializable{
+
+ // A TraversalStrategy should not have a public constructor
+ // Make use of a singleton instance() object to reduce object creation on the JVM
+ // Moreover they are stateless objects.
+
+ public void apply(final Traversal.Admin<?, ?> traversal, final TraversalEngine traversalEngine);
+
+ public default Set<Class<? extends TraversalStrategy>> applyPrior() {
+ return Collections.emptySet();
+ }
+
+ public default Set<Class<? extends TraversalStrategy>> applyPost() {
+ return Collections.emptySet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java
new file mode 100644
index 0000000..dfa2ae9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.detached.Attachable;
+
+import java.io.Serializable;
+
+/**
+ * A {@link Traverser} represents the current state of an object flowing through a {@link Traversal}.
+ * A traverser maintains a reference to the current object, a traverser-local "sack", a traversal-global sideEffect, a bulk count, and a path history.
+ * <p/>
+ * Different types of traverser can exist depending on the semantics of the traversal and the desire for
+ * space/time optimizations of the developer.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Traverser<T> extends Serializable, Comparable<Traverser<T>>, Cloneable {
+
+ /**
+ * Get the object that the traverser is current at.
+ *
+ * @return The current object of the traverser
+ */
+ public T get();
+
+ /**
+ * Get the sack local sack object of this traverser.
+ *
+ * @param <S> the type of the sack object
+ * @return the sack object
+ */
+ public <S> S sack();
+
+ /**
+ * Set the traversers sack object to the provided value ("sack the value").
+ *
+ * @param object the new value of the traverser's sack
+ * @param <S> the type of the object
+ */
+ public <S> void sack(final S object);
+
+ /**
+ * Get the current path of the traverser.
+ *
+ * @return The path of the traverser
+ */
+ public Path path();
+
+ /**
+ * Get the object associated with the specified step-label in the traverser's path history.
+ *
+ * @param stepLabel the step-label in the path to access
+ * @param <A> the type of the object
+ * @return the object associated with that path label (if more than one object occurs at that step, a list is returned)
+ */
+ public default <A> A path(final String stepLabel) {
+ return this.path().get(stepLabel);
+ }
+
+ /**
+ * Return the number of times the traverser has gone through a looping section of a traversal.
+ *
+ * @return The number of times the traverser has gone through a loop
+ */
+ public int loops();
+
+ /**
+ * A traverser may represent a grouping of traversers to allow for more efficient data propagation.
+ *
+ * @return the number of traversers represented in this traverser.
+ */
+ public long bulk();
+
+ /**
+ * Get a particular value from the sideEffects of the traverser.
+ *
+ * @param sideEffectKey the key of the value to get from the sideEffects
+ * @param <A> the type of the returned object
+ * @return the object in the sideEffects of the respective key
+ */
+ public default <A> A sideEffects(final String sideEffectKey) {
+ return this.asAdmin().getSideEffects().get(sideEffectKey);
+ }
+
+ /**
+ * Set a particular value in the sideEffects of the traverser.
+ *
+ * @param sideEffectKey the key of the value to set int the sideEffects
+ * @param sideEffectValue the value to set for the sideEffect key
+ */
+ public default void sideEffects(final String sideEffectKey, final Object sideEffectValue) {
+ this.asAdmin().getSideEffects().set(sideEffectKey, sideEffectValue);
+ }
+
+ /**
+ * If the underlying object of the traverser is comparable, compare it with the other traverser.
+ *
+ * @param other the other traverser that presumably has a comparable internal object
+ * @return the comparison of the two objects of the traversers
+ * @throws ClassCastException if the object of the traverser is not comparable
+ */
+ @Override
+ public default int compareTo(final Traverser<T> other) throws ClassCastException {
+ return ((Comparable) this.get()).compareTo(other.get());
+ }
+
+ /**
+ * Typecast the traverser to a "system traverser" so {@link Traverser.Admin} methods can be accessed.
+ * This is used as a helper method to avoid the awkwardness of <code>((Traverser.Administrative)traverser)</code>.
+ * The default implementation simply returns "this" type casted to {@link Traverser.Admin}.
+ *
+ * @return The type-casted traverser
+ */
+ public default Admin<T> asAdmin() {
+ return (Admin<T>) this;
+ }
+
+ /**
+ * Traverser cloning is important when splitting a traverser at a bifurcation point in a traversal.
+ */
+ public Traverser<T> clone() throws CloneNotSupportedException;
+
+ /**
+ * The methods in System.Traverser are useful to underlying Step and Traversal implementations.
+ * They should not be accessed by the user during lambda-based manipulations.
+ */
+ public interface Admin<T> extends Traverser<T>, Attachable<Admin<T>> {
+
+ public static final String HALT = "halt";
+
+ /**
+ * When two traversers are {@link Traverser#equals} to each other, then they can be merged.
+ * This method is used to merge the traversers into a single traverser.
+ * This is used for optimization where instead of enumerating all traversers, they can be counted.
+ *
+ * @param other the other traverser to merge into this traverser. Once merged, the other can be garbage collected.
+ */
+ public void merge(final Admin<?> other);
+
+ /**
+ * Generate a child traverser of the current traverser for current as step and new object location.
+ * The child has the path history, future, and loop information of the parent.
+ * The child extends that path history with the current as and provided R-object.
+ *
+ * @param r The current object of the child
+ * @param step The step yielding the split
+ * @param <R> The current object type of the child
+ * @return The split traverser
+ */
+ public <R> Admin<R> split(final R r, final Step<T, R> step);
+
+ /**
+ * Generate a sibling traverser of the current traverser with a full copy of all state within the sibling.
+ *
+ * @return The split traverser
+ */
+ public Admin<T> split();
+
+ /**
+ * Set the current object location of the traverser.
+ *
+ * @param t The current object of the traverser
+ */
+ public void set(final T t);
+
+ /**
+ * Increment the number of times the traverser has gone through a looping section of traversal.
+ * The step label is important to create a stack of loop counters when within a nested context.
+ * If the provided label is not the same as the current label on the stack, add a new loop counter.
+ * If the provided label is the same as the current label on the stack, increment the loop counter.
+ *
+ * @param stepLabel the label of the step that is doing the incrementing
+ */
+ public void incrLoops(final String stepLabel);
+
+ /**
+ * Set the number of times the traverser has gone through a loop back to 0.
+ * When a traverser exits a looping construct, this method should be called.
+ * In a nested loop context, the highest stack loop counter should be removed.
+ */
+ public void resetLoops();
+
+ /**
+ * Get the step id of where the traverser is located.
+ * This is typically used in multi-machine systems that require the movement of
+ * traversers between different traversal instances.
+ *
+ * @return The future step for the traverser
+ */
+ public String getStepId();
+
+ /**
+ * Set the step id of where the traverser is located.
+ * If the future is {@link Traverser.Admin#HALT}, then {@link Traverser.Admin#isHalted()} is true.
+ *
+ * @param stepId The future step of the traverser
+ */
+ public void setStepId(final String stepId);
+
+ /**
+ * If the traverser has "no future" then it is done with its lifecycle.
+ * This does not mean that the traverser is "dead," only that it has successfully passed through a {@link Traversal}.
+ *
+ * @return Whether the traverser is done executing or not
+ */
+ public default boolean isHalted() {
+ return getStepId().equals(HALT);
+ }
+
+ /**
+ * Set the number of traversers represented by this traverser.
+ *
+ * @param count the number of traversers
+ */
+ public void setBulk(final long count);
+
+ /**
+ * Prepare the traverser for migration across a JVM boundary.
+ *
+ * @return The deflated traverser
+ */
+ public Admin<T> detach();
+
+ /**
+ * Regenerate the detached traverser given its location at a particular vertex.
+ *
+ * @param hostVertex The vertex that is hosting the traverser
+ * @return The inflated traverser
+ */
+ @Override
+ public Admin<T> attach(final Vertex hostVertex);
+
+ /**
+ * Traversers can not attach to graphs and thus, an {@link UnsupportedOperationException} is thrown.
+ *
+ * @param graph the graph to attach the traverser to, which it can't.
+ * @return nothing as an exception is thrown
+ * @throws UnsupportedOperationException is always thrown as it makes no sense to attach a traverser to a graph
+ */
+ @Override
+ public default Admin<T> attach(final Graph graph) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("A traverser can only exist at the vertices of the graph, not the graph itself");
+ }
+
+ /**
+ * Set the sideEffects of the {@link Traversal}. Given that traversers can move between machines,
+ * it may be important to re-set this when the traverser crosses machine boundaries.
+ *
+ * @param sideEffects the sideEffects of the traversal.
+ */
+ public void setSideEffects(final TraversalSideEffects sideEffects);
+
+ /**
+ * Get the sideEffects associated with the traversal of the traverser.
+ *
+ * @return the traversal sideEffects of the traverser
+ */
+ public TraversalSideEffects getSideEffects();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java
new file mode 100644
index 0000000..e0ca217
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface TraverserGenerator {
+
+ public Set<TraverserRequirement> getProvidedRequirements();
+
+ public <S> Traverser.Admin<S> generate(final S start, final Step<S, ?> startStep, final long initialBulk);
+
+ public default <S> Iterator<Traverser.Admin<S>> generateIterator(final Iterator<S> starts, final Step<S, ?> startStep, final long initialBulk) {
+ return new Iterator<Traverser.Admin<S>>() {
+ @Override
+ public boolean hasNext() {
+ return starts.hasNext();
+ }
+
+ @Override
+ public Traverser.Admin<S> next() {
+ return generate(starts.next(), startStep, initialBulk);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java
new file mode 100644
index 0000000..d7bfb90
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * The result of the {@link GraphComputer}'s computation. This is returned in a {@link java.util.concurrent.Future} by {@link GraphComputer#submit}.
+ * A GraphComputer computation yields two things: an updated view of the computed on {@link Graph} and any computational sideEffects called {@link Memory}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface ComputerResult extends AutoCloseable {
+
+ /**
+ * Get the view of the original {@link Graph} computed on by the GraphComputer.
+ *
+ * @return The computed graph
+ */
+ public Graph graph();
+
+ /**
+ * Get the computational sideEffects called {@link Memory} of the GraphComputer.
+ *
+ * @return the computed memory
+ */
+ public Memory memory();
+
+ /**
+ * Close the computed {@link GraphComputer} result. The semantics of "close" differ depending on the underlying implementation.
+ * In general, when a {@link ComputerResult} is closed, the computed values are no longer available to the user.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void close() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
new file mode 100644
index 0000000..2ad3e6d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import java.util.concurrent.Future;
+
+/**
+ * The {@link GraphComputer} is responsible for the execution of a {@link VertexProgram} and then a set of {@link MapReduce} jobs
+ * over the vertices in the {@link com.tinkerpop.gremlin.structure.Graph}. It is up to the {@link GraphComputer} implementation to determine the
+ * appropriate memory structures given the computing substrate. {@link GraphComputer} implementations also
+ * maintains levels of memory {@link Isolation}: Bulk Synchronous and Dirty Bulk Synchronous.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface GraphComputer {
+
+ public enum Isolation {
+ /**
+ * Computations are carried out in a bulk synchronous manner.
+ * The results of a vertex property update are only visible after the round is complete.
+ */
+ BSP,
+ /**
+ * Computations are carried out in an bulk asynchronous manner.
+ * The results of a vertex property update are visible before the end of the round.
+ */
+ DIRTY_BSP
+ }
+
+ /**
+ * Set the {@link Isolation} of the computation.
+ *
+ * @param isolation the isolation of the computation
+ * @return the updated GraphComputer with newly set isolation
+ */
+ public GraphComputer isolation(final Isolation isolation);
+
+ /**
+ * Set the {@link VertexProgram} to be executed by the {@link GraphComputer}.
+ * There can only be one VertexProgram for the GraphComputer.
+ *
+ * @param vertexProgram the VertexProgram to be executed
+ * @return the updated GraphComputer with newly set VertexProgram
+ */
+ public GraphComputer program(final VertexProgram vertexProgram);
+
+ /**
+ * Add a {@link MapReduce} job to the set of MapReduce jobs to be executed by the {@link GraphComputer}.
+ * There can be any number of MapReduce jobs.
+ *
+ * @param mapReduce the MapReduce job to add to the computation
+ * @return the updated GraphComputer with newly added MapReduce job
+ */
+ public GraphComputer mapReduce(final MapReduce mapReduce);
+
+ /**
+ * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}.
+ *
+ * @return a {@link Future} denoting a reference to the asynchronous computation and where to get the {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} when its is complete.
+ */
+ public Future<ComputerResult> submit();
+
+ public default Features features() {
+ return new Features() {
+ };
+ }
+
+ public interface Features {
+ public default boolean supportsWorkerPersistenceBetweenIterations() {
+ return true;
+ }
+
+ public default boolean supportsGlobalMessageScopes() {
+ return true;
+ }
+
+ public default boolean supportsLocalMessageScopes() {
+ return true;
+ }
+
+ public default boolean supportsVertexAddition() {
+ return true;
+ }
+
+ public default boolean supportsVertexRemoval() {
+ return true;
+ }
+
+ public default boolean supportsVertexPropertyAddition() {
+ return true;
+ }
+
+ public default boolean supportsVertexPropertyRemoval() {
+ return true;
+ }
+
+ public default boolean supportsEdgeAddition() {
+ return true;
+ }
+
+ public default boolean supportsEdgeRemoval() {
+ return true;
+ }
+
+ public default boolean supportsEdgePropertyAddition() {
+ return true;
+ }
+
+ public default boolean supportsEdgePropertyRemoval() {
+ return true;
+ }
+
+ public default boolean supportsIsolation(final Isolation isolation) {
+ return true;
+ }
+ }
+
+ public static class Exceptions {
+ public static IllegalStateException adjacentElementPropertiesCanNotBeRead() {
+ return new IllegalStateException("The properties of an adjacent element can not be read, only its id");
+ }
+
+ public static IllegalStateException adjacentElementPropertiesCanNotBeWritten() {
+ return new IllegalStateException("The properties of an adjacent element can not be written");
+ }
+
+ public static IllegalArgumentException providedKeyIsNotAnElementComputeKey(final String key) {
+ return new IllegalArgumentException("The provided key is not an element compute key: " + key);
+ }
+
+ public static IllegalArgumentException providedKeyIsNotAMemoryComputeKey(final String key) {
+ return new IllegalArgumentException("The provided key is not a memory compute key: " + key);
+ }
+
+ public static IllegalStateException adjacentVerticesCanNotBeQueried() {
+ return new IllegalStateException("It is not possible to query an adjacent vertex in a vertex program");
+ }
+
+ public static IllegalArgumentException isolationNotSupported(final Isolation isolation) {
+ return new IllegalArgumentException("The provided isolation is not supported by this graph computer: " + isolation);
+ }
+
+ public static IllegalStateException computerHasAlreadyBeenSubmittedAVertexProgram() {
+ return new IllegalStateException("This computer has already had a vertex program submitted to it");
+ }
+
+ public static IllegalStateException computerHasNoVertexProgramNorMapReducers() {
+ return new IllegalStateException("The computer has no vertex program or map reducers to execute");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java
new file mode 100644
index 0000000..ca2af61
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class KeyValue<K, V> implements Serializable {
+
+ private final K key;
+ private final V value;
+ private static final String EMPTY_STRING = "";
+ private static final String TAB = "\t";
+ private static final String NULL = "null";
+
+ public KeyValue(final K key, final V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return this.key;
+ }
+
+ public V getValue() {
+ return this.value;
+ }
+
+ public String toString() {
+ if (this.key instanceof MapReduce.NullObject && this.value instanceof MapReduce.NullObject) {
+ return EMPTY_STRING;
+ } else if (this.key instanceof MapReduce.NullObject) {
+ return makeString(this.value);
+ } else if (this.value instanceof MapReduce.NullObject) {
+ return makeString(this.key);
+ } else {
+ return makeString(this.key) + TAB + makeString(this.value);
+ }
+ }
+
+ private static final String makeString(final Object object) {
+ return null == object ? NULL : object.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
new file mode 100644
index 0000000..b669aa9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.structure.Vertex;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+
+/**
+ * A MapReduce is composed of map(), combine(), and reduce() stages.
+ * The map() stage processes the vertices of the {@link com.tinkerpop.gremlin.structure.Graph} in a logically parallel manner.
+ * The combine() stage aggregates the values of a particular map emitted key prior to sending across the cluster.
+ * The reduce() stage aggregates the values of the combine/map emitted keys for the keys that hash to the current machine in the cluster.
+ * The interface presented here is nearly identical to the interface popularized by Hadoop save the the map() is over the vertices of the graph.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
+
+ public static final String MAP_REDUCE = "gremlin.mapReduce";
+
+ /**
+ * MapReduce is composed of three stages: map, combine, and reduce.
+ */
+ public static enum Stage {
+ MAP, COMBINE, REDUCE
+ }
+
+ /**
+ * When it is necessary to store the state of a MapReduce job, this method is called.
+ * This is typically required when the MapReduce job needs to be serialized to another machine.
+ * Note that what is stored is simply the instance state, not any processed data.
+ *
+ * @param configuration the configuration to store the state of the MapReduce job in.
+ */
+ public default void storeState(final Configuration configuration) {
+ configuration.setProperty(MAP_REDUCE, this.getClass().getName());
+ }
+
+ /**
+ * When it is necessary to load the state of a MapReduce job, this method is called.
+ * This is typically required when the MapReduce job needs to be serialized to another machine.
+ * Note that what is loaded is simply the instance state, not any processed data.
+ * <p/>
+ * It is important that the state loaded from loadState() is identical to any state created from a constructor.
+ * For those GraphComputers that do not need to use Configurations to migrate state between JVMs, the constructor will only be used.
+ *
+ * @param configuration the configuration to load the state of the MapReduce job from.
+ */
+ public default void loadState(final Configuration configuration) {
+
+ }
+
+ /**
+ * A MapReduce job can be map-only, map-reduce-only, or map-combine-reduce.
+ * Before executing the particular stage, this method is called to determine if the respective stage is defined.
+ * This method should return true if the respective stage as a non-default method implementation.
+ *
+ * @param stage the stage to check for definition.
+ * @return whether that stage should be executed.
+ */
+ public boolean doStage(final Stage stage);
+
+ /**
+ * The map() method is logically executed at all vertices in the graph in parallel.
+ * The map() method emits key/value pairs given some analysis of the data in the vertices (and/or its incident edges).
+ *
+ * @param vertex the current vertex being map() processed.
+ * @param emitter the component that allows for key/value pairs to be emitted to the next stage.
+ */
+ public default void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
+ }
+
+ /**
+ * The combine() method is logically executed at all "machines" in parallel.
+ * The combine() method pre-combines the values for a key prior to propagation over the wire.
+ * The combine() method must emit the same key/value pairs as the reduce() method.
+ * If there is a combine() implementation, there must be a reduce() implementation.
+ * If the MapReduce implementation is single machine, it can skip executing this method as reduce() is sufficient.
+ *
+ * @param key the key that has aggregated values
+ * @param values the aggregated values associated with the key
+ * @param emitter the component that allows for key/value pairs to be emitted to the reduce stage.
+ */
+ public default void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+ }
+
+ /**
+ * The reduce() method is logically on the "machine" the respective key hashes to.
+ * The reduce() method combines all the values associated with the key and emits key/value pairs.
+ *
+ * @param key the key that has aggregated values
+ * @param values the aggregated values associated with the key
+ * @param emitter the component that allows for key/value pairs to be emitted as the final result.
+ */
+ public default void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+ }
+
+ /**
+ * If a {@link Comparator} is provided, then all pairs leaving the {@link MapEmitter} are sorted.
+ * The sorted results are either fed sorted to the combine/reduce-stage or as the final output.
+ * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
+ * The default implementation returns {@link Optional#empty}.
+ *
+ * @return an {@link Optional} of a comparator for sorting the map output.
+ */
+ public default Optional<Comparator<MK>> getMapKeySort() {
+ return Optional.empty();
+ }
+
+ /**
+ * If a {@link Comparator} is provided, then all pairs leaving the {@link ReduceEmitter} are sorted.
+ * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
+ * The default implementation returns {@link Optional#empty}.
+ *
+ * @return an {@link Optional} of a comparator for sorting the reduce output.
+ */
+ public default Optional<Comparator<RK>> getReduceKeySort() {
+ return Optional.empty();
+ }
+
+ /**
+ * The key/value pairs emitted by reduce() (or map() in a map-only job) can be iterated to generate a local JVM Java object.
+ *
+ * @param keyValues the key/value pairs that were emitted from reduce() (or map() in a map-only job)
+ * @return the resultant object formed from the emitted key/values.
+ */
+ public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues);
+
+ /**
+ * The results of the MapReduce job are associated with a memory-key to ultimately be stored in {@link Memory}.
+ *
+ * @return the memory key of the generated result object.
+ */
+ public String getMemoryKey();
+
+ /**
+ * The final result can be generated and added to {@link Memory} and accessible via {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult}.
+ * The default simply takes the object from generateFinalResult() and adds it to the Memory given getMemoryKey().
+ *
+ * @param memory the memory of the {@link GraphComputer}
+ * @param keyValues the key/value pairs emitted from reduce() (or map() in a map only job).
+ */
+ public default void addResultToMemory(final Memory.Admin memory, final Iterator<KeyValue<RK, RV>> keyValues) {
+ memory.set(this.getMemoryKey(), this.generateFinalResult(keyValues));
+ }
+
+ /**
+ * When multiple workers on a single machine need MapReduce instances, it is possible to use clone.
+ * This will provide a speedier way of generating instances, over the {@link MapReduce#storeState} and {@link MapReduce#loadState} model.
+ * The default implementation simply returns the object as it assumes that the MapReduce instance is a stateless singleton.
+ *
+ * @return A clone of the MapReduce object
+ * @throws CloneNotSupportedException
+ */
+ public MapReduce<MK, MV, RK, RV, R> clone() throws CloneNotSupportedException;
+
+ /**
+ * A helper method to construct a {@link MapReduce} given the content of the supplied configuration.
+ * The class of the MapReduce is read from the {@link MapReduce#MAP_REDUCE} static configuration key.
+ * Once the MapReduce is constructed, {@link MapReduce#loadState} method is called with the provided configuration.
+ *
+ * @param configuration A configuration with requisite information to build a MapReduce
+ * @return the newly constructed MapReduce
+ */
+ public static <M extends MapReduce<MK, MV, RK, RV, R>, MK, MV, RK, RV, R> M createMapReduce(final Configuration configuration) {
+ try {
+ final Class<M> mapReduceClass = (Class) Class.forName(configuration.getString(MAP_REDUCE));
+ final Constructor<M> constructor = mapReduceClass.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ final M mapReduce = constructor.newInstance();
+ mapReduce.loadState(configuration);
+ return mapReduce;
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ //////////////////
+
+ /**
+ * The MapEmitter is used to emit key/value pairs from the map() stage of the MapReduce job.
+ * The implementation of MapEmitter is up to the vendor, not the developer.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+ public interface MapEmitter<K, V> {
+ public void emit(final K key, final V value);
+
+ /**
+ * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
+ *
+ * @param value the value to emit.
+ */
+ public default void emit(final V value) {
+ this.emit((K) MapReduce.NullObject.instance(), value);
+ }
+ }
+
+ /**
+ * The ReduceEmitter is used to emit key/value pairs from the combine() and reduce() stages of the MapReduce job.
+ * The implementation of ReduceEmitter is up to the vendor, not the developer.
+ *
+ * @param <OK> the key type
+ * @param <OV> the value type
+ */
+ public interface ReduceEmitter<OK, OV> {
+ public void emit(final OK key, OV value);
+
+ /**
+ * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
+ *
+ * @param value the value to emit.
+ */
+ public default void emit(final OV value) {
+ this.emit((OK) MapReduce.NullObject.instance(), value);
+ }
+ }
+
+ //////////////////
+
+ /**
+ * A convenience singleton when a single key is needed so that all emitted values converge to the same combiner/reducer.
+ */
+ public static class NullObject implements Comparable<NullObject>, Serializable {
+ private static final NullObject INSTANCE = new NullObject();
+ private static final String NULL_OBJECT = "";
+
+ public static NullObject instance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public int hashCode() {
+ return 666666666;
+ }
+
+ @Override
+ public boolean equals(final Object object) {
+ return object instanceof NullObject;
+ }
+
+ @Override
+ public int compareTo(final NullObject nullObject) {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return NULL_OBJECT;
+ }
+
+ private void readObject(final ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
+
+ }
+
+ private void writeObject(final ObjectOutputStream outputStream) throws IOException {
+
+ }
+ }
+}