You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:15 UTC
[06/15] flink git commit: [FLINK-6709] [gelly] Activate strict
checkstyle for flink-gellies
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index 7e8ebd7..c30b1a7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -54,22 +54,21 @@ import java.util.Map;
* This class represents iterative graph computations, programmed in a vertex-centric perspective.
* It is a special case of <i>Bulk Synchronous Parallel</i> computation. The paradigm has also been
* implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The
+ *
+ * <p>Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The
* algorithms send messages along the edges and update the state of vertices based on
* the old state and the incoming messages. All vertices have an initial state.
* The computation terminates once no vertex receives any message anymore.
* Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by one function:
+ *
+ * <p>The computation is here represented by one function:
* <ul>
* <li>The {@link ComputeFunction} receives incoming messages, may update the state for
* the vertex, and sends messages along the edges of the vertex.
* </li>
* </ul>
- * <p>
*
- * Vertex-centric graph iterations are run by calling
+ * <p>Vertex-centric graph iterations are run by calling
* {@link Graph#runVertexCentricIteration(ComputeFunction, MessageCombiner, int)}.
*
* @param <K> The type of the vertex key (the vertex identifier).
@@ -77,25 +76,25 @@ import java.util.Map;
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EV> The type of the values that are associated with the edges.
*/
-public class VertexCentricIteration<K, VV, EV, Message>
+public class VertexCentricIteration<K, VV, EV, Message>
implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
private final ComputeFunction<K, VV, EV, Message> computeFunction;
private final MessageCombiner<K, Message> combineFunction;
-
+
private final DataSet<Edge<K, EV>> edgesWithValue;
-
+
private final int maximumNumberOfIterations;
-
+
private final TypeInformation<Message> messageType;
-
+
private DataSet<Vertex<K, VV>> initialVertices;
private VertexCentricConfiguration configuration;
// ----------------------------------------------------------------------------------
-
+
private VertexCentricIteration(ComputeFunction<K, VV, EV, Message> cf,
DataSet<Edge<K, EV>> edgesWithValue, MessageCombiner<K, Message> mc,
int maximumNumberOfIterations) {
@@ -108,45 +107,44 @@ public class VertexCentricIteration<K, VV, EV, Message>
this.computeFunction = cf;
this.edgesWithValue = edgesWithValue;
this.combineFunction = mc;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
this.messageType = getMessageType(cf);
}
-
+
private TypeInformation<Message> getMessageType(ComputeFunction<K, VV, EV, Message> cf) {
return TypeExtractor.createTypeInfo(cf, ComputeFunction.class, cf.getClass(), 3);
}
-
+
// --------------------------------------------------------------------------------------------
// Custom Operator behavior
// --------------------------------------------------------------------------------------------
-
+
/**
* Sets the input data set for this operator. In the case of this operator this input data set represents
* the set of vertices with their initial state.
- *
+ *
* @param inputData The input data set, which in the case of this operator represents the set of
* vertices with their initial state.
- *
+ *
* @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
*/
@Override
public void setInput(DataSet<Vertex<K, VV>> inputData) {
this.initialVertices = inputData;
}
-
+
/**
* Creates the operator that represents this vertex-centric graph computation.
- * <p>
- * The Pregel iteration is mapped to delta iteration as follows.
- * The solution set consists of the set of active vertices and the workset contains the set of messages
- * send to vertices during the previous superstep. Initially, the workset contains a null message for each vertex.
- * In the beginning of a superstep, the solution set is joined with the workset to produce
- * a dataset containing tuples of vertex state and messages (vertex inbox).
- * The superstep compute UDF is realized with a coGroup between the vertices with inbox and the graph edges.
- * The output of the compute UDF contains both the new vertex values and the new messages produced.
- * These are directed to the solution set delta and new workset, respectively, with subsequent flatMaps.
- * <p/>
- *
+ *
+ * <p>The Pregel iteration is mapped to delta iteration as follows.
+ * The solution set consists of the set of active vertices and the workset contains the set of messages
+ * send to vertices during the previous superstep. Initially, the workset contains a null message for each vertex.
+ * In the beginning of a superstep, the solution set is joined with the workset to produce
+ * a dataset containing tuples of vertex state and messages (vertex inbox).
+ * The superstep compute UDF is realized with a coGroup between the vertices with inbox and the graph edges.
+ * The output of the compute UDF contains both the new vertex values and the new messages produced.
+ * These are directed to the solution set delta and new workset, respectively, with subsequent flatMaps.
+ *
* @return The operator that represents this vertex-centric graph computation.
*/
@Override
@@ -226,15 +224,15 @@ public class VertexCentricIteration<K, VV, EV, Message>
/**
* Creates a new vertex-centric iteration operator.
- *
+ *
* @param edgesWithValue The data set containing edges.
* @param cf The compute function
- *
+ *
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EV> The type of the values that are associated with the edges.
- *
+ *
* @return An instance of the vertex-centric graph computation operator.
*/
public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
@@ -248,16 +246,16 @@ public class VertexCentricIteration<K, VV, EV, Message>
/**
* Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
* a weight or distance).
- *
+ *
* @param edgesWithValue The data set containing edges.
* @param cf The compute function.
* @param mc The function that combines messages sent to a vertex during a superstep.
- *
+ *
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EV> The type of the values that are associated with the edges.
- *
+ *
* @return An instance of the vertex-centric graph computation operator.
*/
public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
@@ -307,7 +305,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
return outTuple;
}
}
-
+
/**
* This coGroup class wraps the user-defined compute function.
* The first input holds a Tuple2 containing the vertex state and its inbox.
@@ -341,7 +339,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
}
this.computeFunction.preSuperstep();
}
-
+
@Override
public void close() throws Exception {
this.computeFunction.postSuperstep();
@@ -365,7 +363,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
// there are no messages during the 1st superstep
}
- else {
+ else {
messageIter.setFirst(first.f1.right());
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Either<NullValue, Message>>> downcastIter =
@@ -381,7 +379,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
@SuppressWarnings("serial")
@ForwardedFields("f0")
- public static class MessageCombinerUdf<K, Message> extends RichGroupReduceFunction<
+ private static class MessageCombinerUdf<K, Message> extends RichGroupReduceFunction<
Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>>
implements ResultTypeQueryable<Tuple2<K, Either<NullValue, Message>>>,
GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> {
@@ -404,7 +402,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
@Override
public void reduce(Iterable<Tuple2<K, Either<NullValue, Message>>> messages,
Collector<Tuple2<K, Either<NullValue, Message>>> out) throws Exception {
-
+
final Iterator<Tuple2<K, Either<NullValue, Message>>> messageIterator = messages.iterator();
if (messageIterator.hasNext()) {
@@ -437,7 +435,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
// --------------------------------------------------------------------------------------------
/**
- * Helper method which sets up an iteration with the given vertex value
+ * Helper method which sets up an iteration with the given vertex value.
*
* @param iteration
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
index 93b3a8c..63ba087 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -32,7 +32,7 @@ import java.util.Collection;
* This class must be extended by functions that compute the state of the vertex depending on the old state and the
* incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is
* invoked once per vertex per superstep.
- *
+ *
* {@code <K>} The vertex key type.
* {@code <VV>} The vertex value type.
* {@code <Message>} The message type.
@@ -81,24 +81,24 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
* This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
* the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link ScatterFunction}.
- *
+ *
* @param vertex The vertex.
* @param inMessages The incoming messages to this vertex.
- *
+ *
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
/**
* This method is executed once per superstep before the gather function is invoked for each vertex.
- *
+ *
* @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
*/
public void preSuperstep() throws Exception {}
/**
* This method is executed once per superstep after the gather function has been invoked for each vertex.
- *
+ *
* @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
*/
public void postSuperstep() throws Exception {}
@@ -106,16 +106,16 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
/**
* Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
*
- * This should be called at most once per updateVertex.
- *
+ * <p>This should be called at most once per updateVertex.
+ *
* @param newValue The new vertex value.
*/
public void setNewVertexValue(VV newValue) {
- if(setNewVertexValueCalled) {
+ if (setNewVertexValueCalled) {
throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
}
setNewVertexValueCalled = true;
- if(isOptDegrees()) {
+ if (isOptDegrees()) {
outValWithDegrees.f1.f0 = newValue;
outWithDegrees.collect(outValWithDegrees);
} else {
@@ -126,7 +126,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
/**
* Gets the number of the superstep, starting at <tt>1</tt>.
- *
+ *
* @return The number of the current superstep.
*/
public int getSuperstepNumber() {
@@ -136,7 +136,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
/**
* Gets the iteration aggregator registered under the given name. The iteration aggregator combines
* all aggregates globally once per superstep and makes them available in the next superstep.
- *
+ *
* @param name The name of the aggregator.
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
@@ -146,7 +146,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
/**
* Get the aggregated value that an aggregator computed in the previous iteration.
- *
+ *
* @param name The name of the aggregator.
* @return The aggregated value of the previous iteration.
*/
@@ -158,7 +158,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
* {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForGatherFunction(String, org.apache.flink.api.java.DataSet)}.
- *
+ *
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
*/
@@ -232,7 +232,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
* In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
* another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}.
*
- * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+ * <p>This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
* the regular updateVertex function.
*
* @param vertexState
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
index d6fdc8a..be36954 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
@@ -18,10 +18,10 @@
package org.apache.flink.graph.spargel;
-import java.util.Iterator;
-
import org.apache.flink.api.java.tuple.Tuple2;
+import java.util.Iterator;
+
/**
* An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
* the <i>foreach</i> syntax.
@@ -30,24 +30,23 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
private static final long serialVersionUID = 1L;
private transient Iterator<Tuple2<?, Message>> source;
-
-
- final void setSource(Iterator<Tuple2<?, Message>> source) {
+
+ void setSource(Iterator<Tuple2<?, Message>> source) {
this.source = source;
}
-
+
@Override
- public final boolean hasNext() {
+ public boolean hasNext() {
return this.source.hasNext();
}
-
+
@Override
- public final Message next() {
+ public Message next() {
return this.source.next().f1;
}
@Override
- public final void remove() {
+ public void remove() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
index b99b5b7..0ffc441 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -34,7 +34,7 @@ import java.util.Iterator;
/**
* The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
- *
+ *
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
@@ -90,23 +90,23 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* This method is invoked once per superstep for each vertex that was changed in that superstep.
* It needs to produce the messages that will be received by vertices in the next superstep.
- *
+ *
* @param vertex The vertex that was changed.
- *
+ *
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
/**
* This method is executed once per superstep before the scatter function is invoked for each vertex.
- *
+ *
* @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
*/
public void preSuperstep() throws Exception {}
/**
* This method is executed once per superstep after the scatter function has been invoked for each vertex.
- *
+ *
* @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
*/
public void postSuperstep() throws Exception {}
@@ -115,11 +115,13 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
* {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
- * <p>
- * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
- * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
- * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
- *
+ *
+ * <p>If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
+ *
+ * <p>If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
+ *
+ * <p>If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
+ *
* @return An iterator with all edges.
*/
@SuppressWarnings("unchecked")
@@ -135,11 +137,13 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* Sends the given message to all vertices that are targets of an edge of the changed vertex.
* This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
- * <p>
- * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
- * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
- * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
- *
+ *
+ * <p>If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
+ *
+ * <p>If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
+ *
+ * <p>If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
+ *
* @param m The message to send.
*/
public void sendMessageToAllNeighbors(Message m) {
@@ -155,16 +159,16 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
Tuple next = (Tuple) edges.next();
/*
- * When EdgeDirection is OUT, the edges iterator only has the out-edges
- * of the vertex, i.e. the ones where this vertex is src.
+ * When EdgeDirection is OUT, the edges iterator only has the out-edges
+ * of the vertex, i.e. the ones where this vertex is src.
* next.getField(1) gives the neighbor of the vertex running this ScatterFunction.
*/
if (getDirection().equals(EdgeDirection.OUT)) {
outValue.f0 = next.getField(1);
}
/*
- * When EdgeDirection is IN, the edges iterator only has the in-edges
- * of the vertex, i.e. the ones where this vertex is trg.
+ * When EdgeDirection is IN, the edges iterator only has the in-edges
+ * of the vertex, i.e. the ones where this vertex is trg.
* next.getField(10) gives the neighbor of the vertex running this ScatterFunction.
*/
else if (getDirection().equals(EdgeDirection.IN)) {
@@ -188,7 +192,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
* the next superstep will cause an exception due to a non-deliverable message.
- *
+ *
* @param target The key (id) of the target vertex to message.
* @param m The message.
*/
@@ -202,7 +206,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* Gets the number of the superstep, starting at <tt>1</tt>.
- *
+ *
* @return The number of the current superstep.
*/
public int getSuperstepNumber() {
@@ -212,7 +216,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* Gets the iteration aggregator registered under the given name. The iteration aggregator combines
* all aggregates globally once per superstep and makes them available in the next superstep.
- *
+ *
* @param name The name of the aggregator.
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
@@ -222,7 +226,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
/**
* Get the aggregated value that an aggregator computed in the previous iteration.
- *
+ *
* @param name The name of the aggregator.
* @return The aggregated value of the previous iteration.
*/
@@ -234,7 +238,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
* {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForScatterFunction(String, org.apache.flink.api.java.DataSet)}.
- *
+ *
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
*/
@@ -277,9 +281,8 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
this.edgesUsed = false;
}
- private static final class EdgesIterator<K, EV>
- implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
- {
+ private static final class EdgesIterator<K, EV>
+ implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> {
private Iterator<Edge<K, EV>> input;
private Edge<K, EV> edge = new Edge<>();
@@ -306,6 +309,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
public void remove() {
throw new UnsupportedOperationException();
}
+
@Override
public Iterator<Edge<K, EV>> iterator() {
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
index 4ac1ae1..6a62847 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -31,23 +31,23 @@ import java.util.List;
* degree of parallelism, to register aggregators and use broadcast sets in
* the {@link GatherFunction} and {@link ScatterFunction}
*
- * The VertexCentricConfiguration object is passed as an argument to
+ * <p>The VertexCentricConfiguration object is passed as an argument to
* {@link org.apache.flink.graph.Graph#runScatterGatherIteration (
* org.apache.flink.graph.spargel.GatherFunction, org.apache.flink.graph.spargel.ScatterFunction, int,
* ScatterGatherConfiguration)}.
*/
public class ScatterGatherConfiguration extends IterationConfiguration {
- /** the broadcast variables for the scatter function **/
+ // the broadcast variables for the scatter function
private List<Tuple2<String, DataSet<?>>> bcVarsScatter = new ArrayList<>();
- /** the broadcast variables for the gather function **/
+ // the broadcast variables for the gather function
private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>();
- /** flag that defines whether the degrees option is set **/
+ // flag that defines whether the degrees option is set
private boolean optDegrees = false;
- /** the direction in which the messages should be sent **/
+ // the direction in which the messages should be sent
private EdgeDirection direction = EdgeDirection.OUT;
public ScatterGatherConfiguration() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index 9f5585d..e3f01a3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -49,14 +49,14 @@ import java.util.Map;
/**
* This class represents iterative graph computations, programmed in a scatter-gather perspective.
* It is a special case of <i>Bulk Synchronous Parallel</i> computation.
- * <p>
- * Scatter-Gather algorithms operate on graphs, which are defined through vertices and edges. The
+ *
+ * <p>Scatter-Gather algorithms operate on graphs, which are defined through vertices and edges. The
* algorithms send messages along the edges and update the state of vertices based on
* the old state and the incoming messages. All vertices have an initial state.
* The computation terminates once no vertex updates its state any more.
* Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
+ *
+ * <p>The computation is here represented by two functions:
* <ul>
* <li>The {@link GatherFunction} receives incoming messages and may updates the state for
* the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
@@ -64,9 +64,8 @@ import java.util.Map;
* <li>The {@link ScatterFunction} takes the new vertex state and sends messages along the outgoing
* edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
* </ul>
- * <p>
*
- * Scatter-Gather graph iterations are are run by calling
+ * <p>Scatter-Gather graph iterations are are run by calling
* {@link Graph#runScatterGatherIteration(ScatterFunction, GatherFunction, int)}.
*
* @param <K> The type of the vertex key (the vertex identifier).
@@ -74,9 +73,8 @@ import java.util.Map;
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EV> The type of the values that are associated with the edges.
*/
-public class ScatterGatherIteration<K, VV, Message, EV>
- implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
-{
+public class ScatterGatherIteration<K, VV, Message, EV>
+ implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
private final ScatterFunction<K, VV, Message, EV> scatterFunction;
private final GatherFunction<K, VV, Message> gatherFunction;
@@ -95,9 +93,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
private ScatterGatherIteration(ScatterFunction<K, VV, Message, EV> sf,
GatherFunction<K, VV, Message> gf,
- DataSet<Edge<K, EV>> edgesWithValue,
- int maximumNumberOfIterations)
- {
+ DataSet<Edge<K, EV>> edgesWithValue,
+ int maximumNumberOfIterations) {
Preconditions.checkNotNull(sf);
Preconditions.checkNotNull(gf);
Preconditions.checkNotNull(edgesWithValue);
@@ -121,10 +118,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
/**
* Sets the input data set for this operator. In the case of this operator this input data set represents
* the set of vertices with their initial state.
- *
+ *
* @param inputData The input data set, which in the case of this operator represents the set of
* vertices with their initial state.
- *
+ *
* @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
*/
@Override
@@ -134,7 +131,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
/**
* Creates the operator that represents this scatter-gather graph computation.
- *
+ *
* @return The operator that represents this scatter-gather graph computation.
*/
@Override
@@ -163,7 +160,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
}
- if(this.configuration != null) {
+ if (this.configuration != null) {
scatterFunction.setDirection(this.configuration.getDirection());
} else {
scatterFunction.setDirection(EdgeDirection.OUT);
@@ -174,7 +171,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// check whether the degrees option is set and, if so, compute the in and the out degrees and
// add them to the vertex value
- if(this.configuration != null && this.configuration.isOptDegrees()) {
+ if (this.configuration != null && this.configuration.isOptDegrees()) {
return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo, numberOfVertices);
} else {
return createResultSimpleVertex(messagingDirection, messageTypeInfo, numberOfVertices);
@@ -184,7 +181,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
/**
* Creates a new scatter-gather iteration operator for graphs where the edges are associated with a value (such as
* a weight or distance).
- *
+ *
* @param edgesWithValue The data set containing edges.
* @param sf The function that turns changed vertex states into messages along the edges.
* @param gf The function that updates the state of the vertices from the incoming messages.
@@ -193,13 +190,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
* @param <EV> The type of the values that are associated with the edges.
- *
+ *
* @return An in stance of the scatter-gather graph computation operator.
*/
public static <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
- DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf,
- GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations)
- {
+ DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf,
+ GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations) {
+
return new ScatterGatherIteration<>(sf, gf, edgesWithValue, maximumNumberOfIterations);
}
@@ -226,20 +223,18 @@ public class ScatterGatherIteration<K, VV, Message, EV>
/*
* UDF that encapsulates the message sending function for graphs where the edges have an associated value.
*/
- private static abstract class ScatterUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+ private abstract static class ScatterUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
- implements ResultTypeQueryable<Tuple2<K, Message>>
- {
+ implements ResultTypeQueryable<Tuple2<K, Message>> {
+
private static final long serialVersionUID = 1L;
final ScatterFunction<K, VV, Message, EV> scatterFunction;
private transient TypeInformation<Tuple2<K, Message>> resultType;
-
private ScatterUdfWithEdgeValues(ScatterFunction<K, VV, Message, EV> scatterFunction,
- TypeInformation<Tuple2<K, Message>> resultType)
- {
+ TypeInformation<Tuple2<K, Message>> resultType) {
this.scatterFunction = scatterFunction;
this.resultType = resultType;
}
@@ -322,10 +317,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
}
- private static abstract class GatherUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
- Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
- implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
- {
+ private abstract static class GatherUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+ Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
+ implements ResultTypeQueryable<Vertex<K, VVWithDegrees>> {
+
private static final long serialVersionUID = 1L;
final GatherFunction<K, VVWithDegrees, Message> gatherFunction;
@@ -334,10 +329,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
-
private GatherUdf(GatherFunction<K, VVWithDegrees, Message> gatherFunction,
- TypeInformation<Vertex<K, VVWithDegrees>> resultType)
- {
+ TypeInformation<Vertex<K, VVWithDegrees>> resultType) {
+
this.gatherFunction = gatherFunction;
this.resultType = resultType;
}
@@ -477,7 +471,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// configure coGroup message function with name and broadcast variables
messages = messages.name("Messaging");
- if(this.configuration != null) {
+ if (this.configuration != null) {
for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
messages = messages.withBroadcastSet(e.f1, e.f0);
}
@@ -529,7 +523,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
/**
- * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
+ * Helper method which sets up an iteration with the given vertex value(either simple or with degrees).
*
* @param iteration
*/
@@ -582,7 +576,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
break;
case ALL:
messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
- .union(buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
+ .union(buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices));
break;
default:
throw new IllegalArgumentException("Illegal edge direction");
@@ -660,7 +654,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
break;
case ALL:
messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
- .union(buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
+ .union(buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices));
break;
default:
throw new IllegalArgumentException("Illegal edge direction");
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
index 0e3812d..c5b0fc8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.types.valuearray;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.Preconditions;
@@ -53,7 +53,7 @@ implements ValueArray<IntValue> {
private transient int mark;
// hasher used to generate the normalized key
- private Murmur3_32 hash = new Murmur3_32(0x11d2d865);
+ private MurmurHash hash = new MurmurHash(0x11d2d865);
// hash result stored as normalized key
private IntValue hashValue = new IntValue();
@@ -120,7 +120,7 @@ implements ValueArray<IntValue> {
@Override
public String toString() {
StringBuilder sb = new StringBuilder("[");
- for (int idx = 0 ; idx < this.position ; idx++) {
+ for (int idx = 0; idx < this.position; idx++) {
sb.append(data[idx]);
if (idx < position - 1) {
sb.append(",");
@@ -178,7 +178,7 @@ implements ValueArray<IntValue> {
public void write(DataOutputView out) throws IOException {
out.writeInt(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
out.writeInt(data[i]);
}
}
@@ -190,7 +190,7 @@ implements ValueArray<IntValue> {
ensureCapacity(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
data[i] = in.readInt();
}
}
@@ -209,7 +209,7 @@ implements ValueArray<IntValue> {
hash.reset();
hash.hash(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
hash.hash(data[i]);
}
@@ -226,7 +226,7 @@ implements ValueArray<IntValue> {
IntValueArray other = (IntValueArray) o;
int min = Math.min(position, other.position);
- for (int i = 0 ; i < min ; i++) {
+ for (int i = 0; i < min; i++) {
int cmp = Integer.compare(data[i], other.data[i]);
if (cmp != 0) {
@@ -245,7 +245,7 @@ implements ValueArray<IntValue> {
public int hashCode() {
int hash = 1;
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
hash = 31 * hash + data[i];
}
@@ -261,7 +261,7 @@ implements ValueArray<IntValue> {
return false;
}
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
if (data[i] != other.data[i]) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
index bbc9bc5..7e1a6dc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
@@ -30,7 +30,7 @@ import java.io.IOException;
/**
* Specialized comparator for IntValueArray based on CopyableValueComparator.
*
- * This can be used for grouping keys but not for sorting keys.
+ * <p>This can be used for grouping keys but not for sorting keys.
*/
@Internal
public class IntValueArrayComparator extends TypeComparator<IntValueArray> {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
index 7c01e6c..b3b4a79 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.types.valuearray;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
@@ -54,7 +54,7 @@ implements ValueArray<LongValue> {
private transient int mark;
// hasher used to generate the normalized key
- private Murmur3_32 hash = new Murmur3_32(0xdf099ea8);
+ private MurmurHash hash = new MurmurHash(0xdf099ea8);
// hash result stored as normalized key
private IntValue hashValue = new IntValue();
@@ -121,7 +121,7 @@ implements ValueArray<LongValue> {
@Override
public String toString() {
StringBuilder sb = new StringBuilder("[");
- for (int idx = 0 ; idx < this.position ; idx++) {
+ for (int idx = 0; idx < this.position; idx++) {
sb.append(data[idx]);
if (idx < position - 1) {
sb.append(",");
@@ -179,7 +179,7 @@ implements ValueArray<LongValue> {
public void write(DataOutputView out) throws IOException {
out.writeInt(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
out.writeLong(data[i]);
}
}
@@ -191,7 +191,7 @@ implements ValueArray<LongValue> {
ensureCapacity(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
data[i] = in.readLong();
}
}
@@ -210,7 +210,7 @@ implements ValueArray<LongValue> {
hash.reset();
hash.hash(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
hash.hash(data[i]);
}
@@ -227,7 +227,7 @@ implements ValueArray<LongValue> {
LongValueArray other = (LongValueArray) o;
int min = Math.min(position, other.position);
- for (int i = 0 ; i < min ; i++) {
+ for (int i = 0; i < min; i++) {
int cmp = Long.compare(data[i], other.data[i]);
if (cmp != 0) {
@@ -246,7 +246,7 @@ implements ValueArray<LongValue> {
public int hashCode() {
int hash = 1;
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
hash = 31 * hash + (int) (data[i] ^ data[i] >>> 32);
}
@@ -262,7 +262,7 @@ implements ValueArray<LongValue> {
return false;
}
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
if (data[i] != other.data[i]) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
index 26c3da2..278b1a1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
@@ -30,7 +30,7 @@ import java.io.IOException;
/**
* Specialized comparator for LongValueArray based on CopyableValueComparator.
*
- * This can be used for grouping keys but not for sorting keys.
+ * <p>This can be used for grouping keys but not for sorting keys.
*/
@Internal
public class LongValueArrayComparator extends TypeComparator<LongValueArray> {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
index bf247a2..6581550 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
@@ -62,7 +62,7 @@ implements ValueArray<NullValue> {
@Override
public String toString() {
StringBuilder sb = new StringBuilder("[");
- for (int idx = 0 ; idx < this.position ; idx++) {
+ for (int idx = 0; idx < this.position; idx++) {
sb.append("∅");
if (idx < position - 1) {
sb.append(",");
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
index 2228d6e..6d28cc6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
@@ -30,7 +30,7 @@ import java.io.IOException;
/**
* Specialized comparator for NullValueArray based on CopyableValueComparator.
*
- * This can be used for grouping keys but not for sorting keys.
+ * <p>This can be used for grouping keys but not for sorting keys.
*/
@Internal
public class NullValueArrayComparator extends TypeComparator<NullValueArray> {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
index 4699552..fabe990 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.types.valuearray;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Preconditions;
@@ -33,12 +33,12 @@ import java.util.Iterator;
/**
* An array of {@link StringValue}.
- * <p>
- * Strings are serialized to a byte array. Concatenating arrays is as simple
+ *
+ * <p>Strings are serialized to a byte array. Concatenating arrays is as simple
* and fast as extending and copying byte arrays. Strings are serialized when
* individually added to {@code StringValueArray}.
- * <p>
- * For each string added to the array the length is first serialized using a
+ *
+ * <p>For each string added to the array the length is first serialized using a
* variable length integer. Then the string characters are serialized using a
* variable length encoding where the lower 128 ASCII/UFT-8 characters are
* encoded in a single byte. This ensures that common characters are serialized
@@ -74,7 +74,7 @@ implements ValueArray<StringValue> {
private transient int markPosition;
// hasher used to generate the normalized key
- private Murmur3_32 hash = new Murmur3_32(0x19264330);
+ private MurmurHash hash = new MurmurHash(0x19264330);
// hash result stored as normalized key
private IntValue hashValue = new IntValue();
@@ -276,7 +276,7 @@ implements ValueArray<StringValue> {
hash.reset();
hash.hash(position);
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
hash.hash(data[i]);
}
@@ -300,7 +300,7 @@ implements ValueArray<StringValue> {
return cmp;
}
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
cmp = Byte.compare(data[i], other.data[i]);
if (cmp != 0) {
@@ -319,7 +319,7 @@ implements ValueArray<StringValue> {
public int hashCode() {
int hash = 1;
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
hash = 31 * hash + data[i];
}
@@ -339,7 +339,7 @@ implements ValueArray<StringValue> {
return false;
}
- for (int i = 0 ; i < position ; i++) {
+ for (int i = 0; i < position; i++) {
if (data[i] != other.data[i]) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
index df88a8e..9d1c0f7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
@@ -32,7 +32,7 @@ import static org.apache.flink.graph.types.valuearray.StringValueArray.HIGH_BIT;
/**
* Specialized comparator for StringValueArray based on CopyableValueComparator.
*
- * This can be used for grouping keys but not for sorting keys.
+ * <p>This can be used for grouping keys but not for sorting keys.
*/
@Internal
public class StringValueArrayComparator extends TypeComparator<StringValueArray> {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
index 6e34b71..35ebd6b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
@@ -30,7 +30,7 @@ import java.io.Serializable;
/**
* Basic interface for array types which reuse objects during serialization.
*
- * Value arrays are usable as grouping keys but not sorting keys.
+ * <p>Value arrays are usable as grouping keys but not sorting keys.
*
* @param <T> the {@link Value} type
*/
@@ -76,7 +76,7 @@ extends Iterable<T>, IOReadableWritable, Serializable, NormalizableKey<ValueArra
/**
* Saves the array index, which can be restored by calling {@code reset()}.
*
- * This is not serialized and is not part of the contract for
+ * <p>This is not serialized and is not part of the contract for
* {@link #equals(Object)}.
*/
void mark();
@@ -91,7 +91,7 @@ extends Iterable<T>, IOReadableWritable, Serializable, NormalizableKey<ValueArra
* expected to release the underlying data structure. This allows the array
* to be reused with minimal impact on the garbage collector.
*
- * This may reset the {@link #mark()} in order to allow arrays be shrunk.
+ * <p>This may reset the {@link #mark()} in order to allow arrays be shrunk.
*/
void clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
index b7b6282..2426550 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
@@ -31,7 +31,7 @@ import org.apache.flink.types.Value;
* for creating a {@link ValueArray}. Algorithms must instantiate classes at
* runtime when the type information has been erased.
*
- * This mirrors creating {@link Value} using {@link CopyableValue#copy()}.
+ * <p>This mirrors creating {@link Value} using {@link CopyableValue#copy()}.
*/
public class ValueArrayFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
index 1e500ea..0a9afa2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
/**
- * Create a Tuple2 DataSet from the vertices of an Edge DataSet
+ * Create a Tuple2 DataSet from the vertices of an Edge DataSet.
*
* @param <K> edge ID type
* @param <EV> edge value type
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
index a050ee8..3b65933 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
/**
- * Create a Tuple3 DataSet from an Edge DataSet
+ * Create a Tuple3 DataSet from an Edge DataSet.
*
* @param <K> edge ID type
* @param <EV> edge value type
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 5292751..3e2ac23 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -30,8 +30,13 @@ import org.apache.flink.types.LongValue;
import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
+/**
+ * {@link Graph} utilities.
+ */
public class GraphUtils {
+ private GraphUtils() {}
+
/**
* Count the number of elements in a DataSet.
*
@@ -64,7 +69,7 @@ public class GraphUtils {
/**
* The identity mapper returns the input as output.
*
- * This does not forward fields and is used to break an operator chain.
+ * <p>This does not forward fields and is used to break an operator chain.
*
* @param <T> element type
*/
@@ -107,7 +112,7 @@ public class GraphUtils {
@Override
public TypeInformation<O> getProducedType() {
- return (TypeInformation<O>)TypeExtractor.createTypeInfo(value.getClass());
+ return (TypeInformation<O>) TypeExtractor.createTypeInfo(value.getClass());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
deleted file mode 100644
index f48feb2..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import java.io.Serializable;
-
-/**
- * A resettable implementation of the 32-bit MurmurHash algorithm.
- */
-public class Murmur3_32 implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // initial seed, which can be reset
- private final int seed;
-
- // number of 32-bit values processed
- private int count;
-
- // in-progress hash value
- private int hash;
-
- /**
- * A resettable implementation of the 32-bit MurmurHash algorithm.
- *
- * @param seed MurmurHash seed
- */
- public Murmur3_32(int seed) {
- this.seed = seed;
- reset();
- }
-
- /**
- * Re-initialize the MurmurHash state.
- *
- * @return this
- */
- public Murmur3_32 reset() {
- count = 0;
- hash = seed;
- return this;
- }
-
- /**
- * Process a {@code double} value.
- *
- * @param input 64-bit input value
- * @return this
- */
- public Murmur3_32 hash(double input) {
- hash(Double.doubleToLongBits(input));
- return this;
- }
-
- /**
- * Process a {@code float} value.
- *
- * @param input 32-bit input value
- * @return this
- */
- public Murmur3_32 hash(float input) {
- hash(Float.floatToIntBits(input));
- return this;
- }
-
- /**
- * Process an {@code integer} value.
- *
- * @param input 32-bit input value
- * @return this
- */
- public Murmur3_32 hash(int input) {
- count++;
-
- input *= 0xcc9e2d51;
- input = Integer.rotateLeft(input, 15);
- input *= 0x1b873593;
-
- hash ^= input;
- hash = Integer.rotateLeft(hash, 13);
- hash = hash * 5 + 0xe6546b64;
-
- return this;
- }
-
- /**
- * Process a {@code long} value.
- *
- * @param input 64-bit input value
- * @return this
- */
- public Murmur3_32 hash(long input) {
- hash((int)(input >>> 32));
- hash((int)input);
- return this;
- }
-
- /**
- * Finalize and return the MurmurHash output.
- *
- * @return 32-bit hash
- */
- public int hash() {
- hash ^= 4 * count;
- hash ^= hash >>> 16;
- hash *= 0x85ebca6b;
- hash ^= hash >>> 13;
- hash *= 0xc2b2ae35;
- hash ^= hash >>> 16;
-
- return hash;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/MurmurHash.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/MurmurHash.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/MurmurHash.java
new file mode 100644
index 0000000..484e793
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/MurmurHash.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import java.io.Serializable;
+
+/**
+ * A resettable implementation of the 32-bit MurmurHash algorithm.
+ */
+public class MurmurHash implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // initial seed, which can be reset
+ private final int seed;
+
+ // number of 32-bit values processed
+ private int count;
+
+ // in-progress hash value
+ private int hash;
+
+ /**
+ * A resettable implementation of the 32-bit MurmurHash algorithm.
+ *
+ * @param seed MurmurHash seed
+ */
+ public MurmurHash(int seed) {
+ this.seed = seed;
+ reset();
+ }
+
+ /**
+ * Re-initialize the MurmurHash state.
+ *
+ * @return this
+ */
+ public MurmurHash reset() {
+ count = 0;
+ hash = seed;
+ return this;
+ }
+
+ /**
+ * Process a {@code double} value.
+ *
+ * @param input 64-bit input value
+ * @return this
+ */
+ public MurmurHash hash(double input) {
+ hash(Double.doubleToLongBits(input));
+ return this;
+ }
+
+ /**
+ * Process a {@code float} value.
+ *
+ * @param input 32-bit input value
+ * @return this
+ */
+ public MurmurHash hash(float input) {
+ hash(Float.floatToIntBits(input));
+ return this;
+ }
+
+ /**
+ * Process an {@code integer} value.
+ *
+ * @param input 32-bit input value
+ * @return this
+ */
+ public MurmurHash hash(int input) {
+ count++;
+
+ input *= 0xcc9e2d51;
+ input = Integer.rotateLeft(input, 15);
+ input *= 0x1b873593;
+
+ hash ^= input;
+ hash = Integer.rotateLeft(hash, 13);
+ hash = hash * 5 + 0xe6546b64;
+
+ return this;
+ }
+
+ /**
+ * Process a {@code long} value.
+ *
+ * @param input 64-bit input value
+ * @return this
+ */
+ public MurmurHash hash(long input) {
+ hash((int) (input >>> 32));
+ hash((int) input);
+ return this;
+ }
+
+ /**
+ * Finalize and return the MurmurHash output.
+ *
+ * @return 32-bit hash
+ */
+ public int hash() {
+ hash ^= 4 * count;
+ hash ^= hash >>> 16;
+ hash *= 0x85ebca6b;
+ hash ^= hash >>> 13;
+ hash *= 0xc2b2ae35;
+ hash ^= hash >>> 16;
+
+ return hash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
index 5eb8287..23d7dd4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
@@ -27,7 +27,7 @@ import org.apache.flink.types.NullValue;
/**
* Create an Edge from a Tuple2.
*
- * The new edge's value is set to {@link NullValue}.
+ * <p>The new edge's value is set to {@link NullValue}.
*
* @param <K> edge ID type
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
index 636ed7b..71639a7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Vertex;
/**
- * Create a Vertex DataSet from a Tuple2 DataSet
+ * Create a Vertex DataSet from a Tuple2 DataSet.
*
* @param <K> vertex ID type
* @param <VV> vertex value type
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
index 8ea54b4..4416e40 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
/**
- * Create an Edge DataSet from a Tuple3 DataSet
+ * Create an Edge DataSet from a Tuple3 DataSet.
*
* @param <K> edge ID type
* @param <EV> edge value type
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
index b1f996c..2a0310e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Vertex;
/**
- * Create a Tuple2 DataSet from a Vertex DataSet
+ * Create a Tuple2 DataSet from a Vertex DataSet.
*
* @param <K> vertex ID type
* @param <VV> vertex value type
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
index 11e7a64..838c021 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -18,13 +18,14 @@
package org.apache.flink.graph.utils.proxy;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.NoOpOperator;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -101,7 +102,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
return true;
}
- if (! GraphAlgorithmWrappingDataSet.class.isAssignableFrom(obj.getClass())) {
+ if (!GraphAlgorithmWrappingDataSet.class.isAssignableFrom(obj.getClass())) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
index 69a6c37..c2aba4e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
@@ -18,8 +18,6 @@
package org.apache.flink.graph.utils.proxy;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.NoOpOperator;
import org.apache.flink.graph.Edge;
@@ -27,6 +25,9 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -107,7 +108,7 @@ implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
return true;
}
- if (! GraphAlgorithmWrappingGraph.class.isAssignableFrom(obj.getClass())) {
+ if (!GraphAlgorithmWrappingGraph.class.isAssignableFrom(obj.getClass())) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
index 7a7208a..e672434 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
@@ -22,13 +22,16 @@ import org.apache.flink.graph.GraphAlgorithm;
/**
* A multi-state boolean.
- * <p>
- * This class is used by {@link GraphAlgorithm} configuration options to set a
+ *
+ * <p>This class is used by {@link GraphAlgorithm} configuration options to set a
* default value which can be overwritten. The default value is also used when
* algorithm configurations are merged and conflict.
*/
public class OptionalBoolean {
+ /**
+ * States for {@link OptionalBoolean}.
+ */
protected enum State {
UNSET,
FALSE,
@@ -114,10 +117,10 @@ public class OptionalBoolean {
}
/**
- * State transitions:
- * if the states are the same then no change
- * if either state is unset then change to the other state
- * if the states are conflicting then set to the conflicting state
+ * State transitions.
+ * - if the states are the same then no change
+ * - if either state is unset then change to the other state
+ * - if the states are conflicting then set to the conflicting state
*
* @param other object from which to merge state
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 75b672c..44635ca 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -18,13 +18,13 @@
package org.apache.flink.graph.validation;
-import java.io.Serializable;
-
import org.apache.flink.graph.Graph;
+import java.io.Serializable;
+
/**
* A utility for defining validation criteria for different types of Graphs.
- *
+ *
* @param <K> the vertex key type
* @param <VV> the vertex value type
* @param <EV> the edge value type
@@ -34,4 +34,4 @@ public abstract class GraphValidator<K, VV, EV> implements Serializable {
public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index b620dd8..57aa987 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -28,13 +28,20 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;
+/**
+ * Validate that the edge set vertex IDs exist in the vertex set.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
@SuppressWarnings("serial")
public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
/**
* Checks that the edge set input contains valid vertex Ids, i.e. that they
* also exist in the vertex input set.
- *
+ *
* @return a boolean stating whether a graph is valid
* with respect to its vertex ids.
*/
@@ -71,4 +78,4 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV,
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 469a23f..f89d4f5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -28,16 +28,20 @@ import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Before;
import java.util.LinkedList;
import java.util.List;
+/**
+ * Simple graphs for testing graph assembly functions.
+ */
public class AsmTestBase {
protected ExecutionEnvironment env;
- protected final double ACCURACY = 0.000001;
+ protected static final double ACCURACY = 0.000001;
// simple graph
protected Graph<IntValue, NullValue, NullValue> directedSimpleGraph;
@@ -95,12 +99,14 @@ public class AsmTestBase {
* scale=10 and edgeFactor=16 but algorithms generating very large DataSets
* require smaller input graphs.
*
- * The examples program can write this graph as a CSV file for verifying
+ * <p>The examples program can write this graph as a CSV file for verifying
* algorithm results with external libraries:
*
+ * <pre>
* ./bin/flink run examples/flink-gelly-examples_*.jar --algorithm EdgeList \
* --input RMatGraph --type long --simplify directed --scale $SCALE --edge_factor $EDGE_FACTOR \
* --output csv --filename directedRMatGraph.csv
+ * </pre>
*
* @param scale vertices are generated in the range [0, 2<sup>scale</sup>)
* @param edgeFactor the edge count is {@code edgeFactor} * 2<sup>scale</sup>
@@ -122,12 +128,14 @@ public class AsmTestBase {
* scale=10 and edgeFactor=16 but algorithms generating very large DataSets
* require smaller input graphs.
*
- * The examples program can write this graph as a CSV file for verifying
+ * <p>The examples program can write this graph as a CSV file for verifying
* algorithm results with external libraries:
*
+ * <pre>
* ./bin/flink run examples/flink-gelly-examples_*.jar --algorithm EdgeList \
* --input RMatGraph --type long --simplify undirected --scale $SCALE --edge_factor $EDGE_FACTOR \
* --output csv --filename undirectedRMatGraph.csv
+ * </pre>
*
* @param scale vertices are generated in the range [0, 2<sup>scale</sup>)
* @param edgeFactor the edge count is {@code edgeFactor} * 2<sup>scale</sup>
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index d25f9b6..7d82b80 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,10 +18,11 @@
package org.apache.flink.graph.asm.dataset;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+
+import org.apache.commons.lang3.ArrayUtils;
import org.junit.Before;
import org.junit.Test;
@@ -30,6 +31,9 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link ChecksumHashCode}.
+ */
public class ChecksumHashCodeTest {
private ExecutionEnvironment env;
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index ec1af42..29b454b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,9 +18,10 @@
package org.apache.flink.graph.asm.dataset;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.apache.commons.lang3.ArrayUtils;
import org.junit.Before;
import org.junit.Test;
@@ -29,6 +30,9 @@ import java.util.List;
import static org.junit.Assert.assertArrayEquals;
+/**
+ * Tests for {@link Collect}.
+ */
public class CollectTest {
private ExecutionEnvironment env;
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index 476c2e6..a1160ce 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,9 +18,10 @@
package org.apache.flink.graph.asm.dataset;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.apache.commons.lang3.ArrayUtils;
import org.junit.Before;
import org.junit.Test;
@@ -29,6 +30,9 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link Count}.
+ */
public class CountTest {
private ExecutionEnvironment env;
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 18b52aa..22b47fe 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -29,10 +29,14 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link EdgeDegreesPair}.
+ */
public class EdgeDegreesPairTest
extends AsmTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 097b9c8..f0d51d2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -29,10 +29,14 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link EdgeSourceDegrees}.
+ */
public class EdgeSourceDegreesTest
extends AsmTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index b082088..6d58bb0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -29,10 +29,14 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link EdgeTargetDegrees}.
+ */
public class EdgeTargetDegreesTest
extends AsmTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index d0aad8f..5214282 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -28,10 +28,14 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link VertexDegrees}.
+ */
public class VertexDegreesTest
extends AsmTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 3cbcc74..f671cab 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -27,10 +27,14 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link VertexInDegree}.
+ */
public class VertexInDegreeTest
extends AsmTestBase {