You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/05 17:16:54 UTC
[2/4] flink git commit: [FLINK-4729] [gelly] Use optional
VertexCentric CombineFunction
[FLINK-4729] [gelly] Use optional VertexCentric CombineFunction
Passes through the CombineFunction to VertexCentricIteration, and other
code cleanup discovered via IntelliJ's code analyzer.
This closes #2587
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb34133e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb34133e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb34133e
Branch: refs/heads/master
Commit: bb34133ed30b2a6c74baa0e5342e278e039cbe2a
Parents: 8c7c42f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Oct 3 13:59:38 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 5 12:26:16 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Edge.java | 2 +-
.../main/java/org/apache/flink/graph/Graph.java | 165 +++++++++----------
.../org/apache/flink/graph/GraphAlgorithm.java | 2 +-
.../org/apache/flink/graph/GraphCsvReader.java | 10 +-
.../flink/graph/IterationConfiguration.java | 2 +-
.../java/org/apache/flink/graph/Triplet.java | 6 +-
.../apache/flink/graph/VertexJoinFunction.java | 5 +-
.../apache/flink/graph/gsa/ApplyFunction.java | 6 +-
.../flink/graph/gsa/GSAConfiguration.java | 6 +-
.../apache/flink/graph/gsa/GatherFunction.java | 6 +-
.../graph/gsa/GatherSumApplyIteration.java | 29 ++--
.../org/apache/flink/graph/gsa/SumFunction.java | 6 +-
.../library/GSASingleSourceShortestPaths.java | 6 +-
.../flink/graph/library/LabelPropagation.java | 2 +-
.../library/SingleSourceShortestPaths.java | 2 +-
.../flink/graph/library/TriangleEnumerator.java | 2 +-
.../flink/graph/pregel/ComputeFunction.java | 24 +--
.../flink/graph/pregel/MessageCombiner.java | 13 +-
.../pregel/VertexCentricConfiguration.java | 2 +-
.../graph/pregel/VertexCentricIteration.java | 45 +++--
.../flink/graph/spargel/GatherFunction.java | 10 +-
.../flink/graph/spargel/ScatterFunction.java | 12 +-
.../graph/spargel/ScatterGatherIteration.java | 5 +-
.../validation/InvalidVertexIdsValidator.java | 6 +-
.../graph/asm/translate/TranslateTest.java | 1 -
.../test/GatherSumApplyConfigurationITCase.java | 33 ++--
.../test/ScatterGatherConfigurationITCase.java | 17 +-
.../operations/GraphCreationWithCsvITCase.java | 5 +-
.../test/operations/GraphOperationsITCase.java | 55 +++----
.../ReduceOnNeighborMethodsITCase.java | 5 +-
30 files changed, 235 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index d84badb..2bcce29 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -46,7 +46,7 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
* and the target is the original Edge's source.
*/
public Edge<K, V> reverse() {
- return new Edge<K, V>(this.f1, this.f0, this.f2);
+ return new Edge<>(this.f1, this.f0, this.f2);
}
public void setSource(K src) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 821b0a7..02d1eeb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -160,7 +161,7 @@ public class Graph<K, VV, EV> {
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
- return new Graph<K, VV, EV>(vertices, edges, context);
+ return new Graph<>(vertices, edges, context);
}
/**
@@ -177,15 +178,15 @@ public class Graph<K, VV, EV> {
DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
- return new Graph<K, NullValue, EV>(vertices, edges, context);
+ return new Graph<>(vertices, edges, context);
}
private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
Edge<K, EV>, Vertex<K, NullValue>> {
public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
- out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
- out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
+ out.collect(new Vertex<>(edge.f0, NullValue.getInstance()));
+ out.collect(new Vertex<>(edge.f1, NullValue.getInstance()));
}
}
@@ -216,19 +217,19 @@ public class Graph<K, VV, EV> {
.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
- return new Vertex<K, VV>(value.f0, vertexValueInitializer.map(value.f0));
+ return new Vertex<>(value.f0, vertexValueInitializer.map(value.f0));
}
}).returns(returnType).withForwardedFields("f0");
- return new Graph<K, VV, EV>(vertices, edges, context);
+ return new Graph<>(vertices, edges, context);
}
private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
Edge<K, EV>, Tuple1<K>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
- out.collect(new Tuple1<K>(edge.f0));
- out.collect(new Tuple1<K>(edge.f1));
+ out.collect(new Tuple1<>(edge.f0));
+ out.collect(new Tuple1<>(edge.f1));
}
}
@@ -316,7 +317,7 @@ public class Graph<K, VV, EV> {
new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
public Edge<K, NullValue> map(Tuple2<K, K> input) {
- return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+ return new Edge<>(input.f0, input.f1, NullValue.getInstance());
}
}).withForwardedFields("f0; f1");
return fromDataSet(edgeDataSet, context);
@@ -344,7 +345,7 @@ public class Graph<K, VV, EV> {
new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
public Edge<K, NullValue> map(Tuple2<K, K> input) {
- return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+ return new Edge<>(input.f0, input.f1, NullValue.getInstance());
}
}).withForwardedFields("f0; f1");
return fromDataSet(edgeDataSet, vertexValueInitializer, context);
@@ -472,7 +473,7 @@ public class Graph<K, VV, EV> {
public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
throws Exception {
- collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
+ collector.collect(new Tuple4<>(edge.getSource(), edge.getTarget(), vertex.getValue(),
edge.getValue()));
}
}
@@ -486,7 +487,7 @@ public class Graph<K, VV, EV> {
public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
- collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
+ collector.collect(new Triplet<>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
}
}
@@ -521,13 +522,13 @@ public class Graph<K, VV, EV> {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
- return new Vertex<K, NV>(value.f0, mapper.map(value));
+ return new Vertex<>(value.f0, mapper.map(value));
}
})
.returns(returnType)
.withForwardedFields("f0");
- return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
+ return new Graph<>(mappedVertices, this.edges, this.context);
}
/**
@@ -596,14 +597,13 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, NV>> mappedEdges = edges.map(
new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
- return new Edge<K, NV>(value.f0, value.f1, mapper
- .map(value));
+ return new Edge<>(value.f0, value.f1, mapper.map(value));
}
})
.returns(returnType)
.withForwardedFields("f0; f1");
- return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
+ return new Graph<>(this.vertices, mappedEdges, this.context);
}
/**
@@ -628,7 +628,7 @@ public class Graph<K, VV, EV> {
DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
.coGroup(inputDataSet).where(0).equalTo(0)
.with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction));
- return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
+ return new Graph<>(resultedVertices, this.edges, this.context);
}
private static final class ApplyCoGroupToVertexValues<K, VV, T>
@@ -651,7 +651,7 @@ public class Graph<K, VV, EV> {
if (inputIterator.hasNext()) {
final Tuple2<K, T> inputNext = inputIterator.next();
- collector.collect(new Vertex<K, VV>(inputNext.f0, vertexJoinFunction
+ collector.collect(new Vertex<>(inputNext.f0, vertexJoinFunction
.vertexJoin(vertexIterator.next().f1, inputNext.f1)));
} else {
collector.collect(vertexIterator.next());
@@ -681,7 +681,7 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
.with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction));
- return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+ return new Graph<>(this.vertices, resultedEdges, this.context);
}
private static final class ApplyCoGroupToEdgeValues<K, EV, T>
@@ -704,7 +704,7 @@ public class Graph<K, VV, EV> {
if (inputIterator.hasNext()) {
final Tuple3<K, K, T> inputNext = inputIterator.next();
- collector.collect(new Edge<K, EV>(inputNext.f0,
+ collector.collect(new Edge<>(inputNext.f0,
inputNext.f1, edgeJoinFunction.edgeJoin(
edgesIterator.next().f2, inputNext.f2)));
} else {
@@ -736,7 +736,7 @@ public class Graph<K, VV, EV> {
.coGroup(inputDataSet).where(0).equalTo(0)
.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
- return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+ return new Graph<>(this.vertices, resultedEdges, this.context);
}
private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
@@ -761,7 +761,7 @@ public class Graph<K, VV, EV> {
while (edgesIterator.hasNext()) {
Edge<K, EV> edgesNext = edgesIterator.next();
- collector.collect(new Edge<K, EV>(edgesNext.f0,
+ collector.collect(new Edge<>(edgesNext.f0,
edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
}
@@ -795,7 +795,7 @@ public class Graph<K, VV, EV> {
.coGroup(inputDataSet).where(1).equalTo(0)
.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
- return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+ return new Graph<>(this.vertices, resultedEdges, this.context);
}
/**
@@ -817,8 +817,7 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
- return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
- this.context);
+ return new Graph<>(filteredVertices, filteredEdges, this.context);
}
/**
@@ -837,7 +836,7 @@ public class Graph<K, VV, EV> {
.join(filteredVertices).where(1).equalTo(0)
.with(new ProjectEdge<K, VV, EV>());
- return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context);
+ return new Graph<>(filteredVertices, remainingEdges, this.context);
}
/**
@@ -850,7 +849,7 @@ public class Graph<K, VV, EV> {
public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
- return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
+ return new Graph<>(this.vertices, filteredEdges, this.context);
}
@ForwardedFieldsFirst("f0; f1; f2")
@@ -924,7 +923,7 @@ public class Graph<K, VV, EV> {
public Graph<K, VV, EV> getUndirected() {
DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
- return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context);
+ return new Graph<>(vertices, undirectedEdges, this.context);
}
/**
@@ -947,13 +946,13 @@ public class Graph<K, VV, EV> {
switch (direction) {
case IN:
return vertices.coGroup(edges).where(0).equalTo(1)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
+ .with(new ApplyCoGroupFunction<>(edgesFunction));
case OUT:
return vertices.coGroup(edges).where(0).equalTo(0)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
+ .with(new ApplyCoGroupFunction<>(edgesFunction));
case ALL:
return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
- .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -980,13 +979,13 @@ public class Graph<K, VV, EV> {
switch (direction) {
case IN:
return vertices.coGroup(edges).where(0).equalTo(1)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
case OUT:
return vertices.coGroup(edges).where(0).equalTo(0)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
case ALL:
return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
- .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1013,14 +1012,14 @@ public class Graph<K, VV, EV> {
case IN:
return edges.map(new ProjectVertexIdMap<K, EV>(1))
.withForwardedFields("f1->f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
case OUT:
return edges.map(new ProjectVertexIdMap<K, EV>(0))
.withForwardedFields("f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
case ALL:
return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1048,14 +1047,14 @@ public class Graph<K, VV, EV> {
case IN:
return edges.map(new ProjectVertexIdMap<K, EV>(1))
.withForwardedFields("f1->f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
case OUT:
return edges.map(new ProjectVertexIdMap<K, EV>(0))
.withForwardedFields("f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
case ALL:
return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1072,7 +1071,7 @@ public class Graph<K, VV, EV> {
@SuppressWarnings("unchecked")
public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
- return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition), edge);
+ return new Tuple2<>((K) edge.getField(fieldPosition), edge);
}
}
@@ -1087,7 +1086,7 @@ public class Graph<K, VV, EV> {
@SuppressWarnings("unchecked")
public Tuple2<K, EV> map(Edge<K, EV> edge) {
- return new Tuple2<K, EV>((K) edge.getField(fieldPosition), edge.getValue());
+ return new Tuple2<>((K) edge.getField(fieldPosition), edge.getValue());
}
}
@@ -1114,8 +1113,8 @@ public class Graph<K, VV, EV> {
Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
- out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
- out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
+ out.collect(new Tuple2<>(edge.getSource(), edge));
+ out.collect(new Tuple2<>(edge.getTarget(), edge));
}
}
@@ -1123,8 +1122,8 @@ public class Graph<K, VV, EV> {
Edge<K, EV>, Tuple2<K, EV>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
- out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
- out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
+ out.collect(new Tuple2<>(edge.getSource(), edge.getValue()));
+ out.collect(new Tuple2<>(edge.getTarget(), edge.getValue()));
}
}
@@ -1132,8 +1131,8 @@ public class Graph<K, VV, EV> {
Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
- out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
- out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
+ out.collect(new Tuple3<>(edge.getSource(), edge.getTarget(), edge));
+ out.collect(new Tuple3<>(edge.getTarget(), edge.getSource(), edge));
}
}
@@ -1224,7 +1223,7 @@ public class Graph<K, VV, EV> {
implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
public Edge<K, EV> map(Edge<K, EV> value) {
- return new Edge<K, EV>(value.f1, value.f0, value.f2);
+ return new Edge<>(value.f1, value.f0, value.f2);
}
}
@@ -1233,8 +1232,8 @@ public class Graph<K, VV, EV> {
@Override
public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
- out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2));
- out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2));
+ out.collect(new Edge<>(edge.f0, edge.f1, edge.f2));
+ out.collect(new Edge<>(edge.f1, edge.f0, edge.f2));
}
}
@@ -1246,7 +1245,7 @@ public class Graph<K, VV, EV> {
*/
public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
- return new Graph<K, VV, EV>(vertices, reversedEdges, this.context);
+ return new Graph<>(vertices, reversedEdges, this.context);
}
/**
@@ -1290,7 +1289,7 @@ public class Graph<K, VV, EV> {
implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
@Override
public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
- return new Tuple2<K, K>(edge.f0, edge.f1);
+ return new Tuple2<>(edge.f0, edge.f1);
}
}
@@ -1302,7 +1301,7 @@ public class Graph<K, VV, EV> {
* @return the new graph containing the existing vertices as well as the one just added
*/
public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
- List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
+ List<Vertex<K, VV>> newVertex = new ArrayList<>();
newVertex.add(vertex);
return addVertices(newVertex);
@@ -1353,7 +1352,7 @@ public class Graph<K, VV, EV> {
*/
public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
- Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
+ Collections.singletonList(new Edge<>(source.f0, target.f0, edgeValue)),
this.context);
return this.union(partialGraph);
}
@@ -1408,7 +1407,7 @@ public class Graph<K, VV, EV> {
*/
public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
- List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>();
+ List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<>();
vertexToBeRemoved.add(vertex);
return removeVertices(vertexToBeRemoved);
@@ -1445,7 +1444,7 @@ public class Graph<K, VV, EV> {
.join(newVertices).where(1).equalTo(0)
.with(new ProjectEdge<K, VV, EV>());
- return new Graph<K, VV, EV>(newVertices, newEdges, context);
+ return new Graph<>(newVertices, newEdges, context);
}
private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
@@ -1515,7 +1514,7 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
.where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
- return new Graph<K, VV, EV>(this.vertices, newEdges, context);
+ return new Graph<>(this.vertices, newEdges, context);
}
private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
@@ -1541,7 +1540,7 @@ public class Graph<K, VV, EV> {
public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
- return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
+ return new Graph<>(unionedVertices, unionedEdges, this.context);
}
/**
@@ -1689,7 +1688,7 @@ public class Graph<K, VV, EV> {
DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ return new Graph<>(newVertices, this.edges, this.context);
}
/**
@@ -1706,7 +1705,7 @@ public class Graph<K, VV, EV> {
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
- org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction,
+ org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
@@ -1727,7 +1726,7 @@ public class Graph<K, VV, EV> {
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
- org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction,
+ org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
GSAConfiguration parameters) {
@@ -1738,7 +1737,7 @@ public class Graph<K, VV, EV> {
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ return new Graph<>(newVertices, this.edges, this.context);
}
/**
@@ -1776,10 +1775,10 @@ public class Graph<K, VV, EV> {
VertexCentricConfiguration parameters) {
VertexCentricIteration<K, VV, EV, M> iteration = VertexCentricIteration.withEdges(
- edges, computeFunction, maximumNumberOfIterations);
+ edges, computeFunction, combiner, maximumNumberOfIterations);
iteration.configure(parameters);
DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ return new Graph<>(newVertices, this.edges, this.context);
}
/**
@@ -1831,14 +1830,14 @@ public class Graph<K, VV, EV> {
.join(this.vertices).where(0).equalTo(0);
return vertices.coGroup(edgesWithSources)
.where(0).equalTo("f0.f1")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0);
return vertices.coGroup(edgesWithTargets)
.where(0).equalTo("f0.f0")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1848,7 +1847,7 @@ public class Graph<K, VV, EV> {
return vertices.coGroup(edgesWithNeighbors)
.where(0).equalTo(0)
- .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1879,14 +1878,14 @@ public class Graph<K, VV, EV> {
.join(this.vertices).where(0).equalTo(0);
return vertices.coGroup(edgesWithSources)
.where(0).equalTo("f0.f1")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0);
return vertices.coGroup(edgesWithTargets)
.where(0).equalTo("f0.f0")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1896,7 +1895,7 @@ public class Graph<K, VV, EV> {
return vertices.coGroup(edgesWithNeighbors)
.where(0).equalTo(0)
- .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1927,7 +1926,7 @@ public class Graph<K, VV, EV> {
.with(new ProjectVertexIdJoin<K, VV, EV>(1))
.withForwardedFieldsFirst("f1->f0");
return edgesWithSources.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
@@ -1935,7 +1934,7 @@ public class Graph<K, VV, EV> {
.with(new ProjectVertexIdJoin<K, VV, EV>(0))
.withForwardedFieldsFirst("f0");
return edgesWithTargets.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1944,7 +1943,7 @@ public class Graph<K, VV, EV> {
.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
return edgesWithNeighbors.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1976,7 +1975,7 @@ public class Graph<K, VV, EV> {
.with(new ProjectVertexIdJoin<K, VV, EV>(1))
.withForwardedFieldsFirst("f1->f0");
return edgesWithSources.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
@@ -1984,7 +1983,7 @@ public class Graph<K, VV, EV> {
.with(new ProjectVertexIdJoin<K, VV, EV>(0))
.withForwardedFieldsFirst("f0");
return edgesWithTargets.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1993,7 +1992,7 @@ public class Graph<K, VV, EV> {
.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
return edgesWithNeighbors.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -2031,7 +2030,7 @@ public class Graph<K, VV, EV> {
@SuppressWarnings("unchecked")
public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
Collector<Tuple2<K, VV>> out) {
- out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
+ out.collect(new Tuple2<>((K) edge.getField(fieldPosition), otherVertex.getValue()));
}
}
@@ -2047,7 +2046,7 @@ public class Graph<K, VV, EV> {
@SuppressWarnings("unchecked")
public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
- out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
+ out.collect(new Tuple3<>((K) edge.getField(fieldPosition), edge, otherVertex));
}
}
@@ -2059,7 +2058,7 @@ public class Graph<K, VV, EV> {
public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
Collector<Tuple2<K, VV>> out) {
- out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
+ out.collect(new Tuple2<>(keysWithEdge.f0, neighbor.getValue()));
}
}
@@ -2070,7 +2069,7 @@ public class Graph<K, VV, EV> {
public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
- out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
+ out.collect(new Tuple3<>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
}
}
@@ -2119,7 +2118,7 @@ public class Graph<K, VV, EV> {
@Override
public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() {
Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next();
- return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2);
+ return new Tuple2<>(next.f1, next.f2);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index 08cf011..8ec9650 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -26,5 +26,5 @@ package org.apache.flink.graph;
*/
public interface GraphAlgorithm<K, VV, EV, T> {
- public T run(Graph<K, VV, EV> input) throws Exception;
+ T run(Graph<K, VV, EV> input) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index f93abc4..4859e12 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -104,7 +104,7 @@ public class GraphCsvReader {
public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue,
Class<EV> edgeValue) {
- DataSet<Tuple2<K, VV>> vertices = null;
+ DataSet<Tuple2<K, VV>> vertices;
if (edgeReader == null) {
throw new RuntimeException("The edges input file cannot be null!");
@@ -160,9 +160,9 @@ public class GraphCsvReader {
private static final long serialVersionUID = -2981792951286476970L;
public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) {
- return new Tuple3<K, K, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+ return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
}
- }).withForwardedFields("f0;f1");;
+ }).withForwardedFields("f0;f1");
return Graph.fromTupleDataSet(edges, executionContext);
}
@@ -179,7 +179,7 @@ public class GraphCsvReader {
@SuppressWarnings({ "serial", "unchecked" })
public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) {
- DataSet<Tuple2<K, VV>> vertices = null;
+ DataSet<Tuple2<K, VV>> vertices;
if (edgeReader == null) {
throw new RuntimeException("The edges input file cannot be null!");
@@ -189,7 +189,7 @@ public class GraphCsvReader {
.map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() {
public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) {
- return new Tuple3<K, K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+ return new Tuple3<>(input.f0, input.f1, NullValue.getInstance());
}
}).withForwardedFields("f0;f1");
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 0b98a27..964d20e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -37,7 +37,7 @@ public abstract class IterationConfiguration {
private int parallelism = -1;
/** the iteration aggregators **/
- private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>();
+ private Map<String, Aggregator<?>> aggregators = new HashMap<>();
/** flag that defines whether the solution set is kept in managed memory **/
private boolean unmanagedSolutionSet = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index dee3480..2ae0903 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -64,14 +64,14 @@ public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
}
public Vertex<K, VV> getSrcVertex() {
- return new Vertex<K, VV>(this.f0, this.f2);
+ return new Vertex<>(this.f0, this.f2);
}
public Vertex<K, VV> getTrgVertex() {
- return new Vertex<K, VV>(this.f1, this.f3);
+ return new Vertex<>(this.f1, this.f3);
}
public Edge<K, EV> getEdge() {
- return new Edge<K, EV>(this.f0, this.f1, this.f4);
+ return new Edge<>(this.f0, this.f1, this.f4);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
index a30d1a2..f40dac9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
@@ -18,9 +18,10 @@
package org.apache.flink.graph;
-import java.io.Serializable;
-
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.DataSet;
+
+import java.io.Serializable;
/**
* Interface to be implemented by the transformation function
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index e9add7c..f05c254 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -105,7 +105,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
+ return this.runtimeContext.getIterationAggregator(name);
}
/**
@@ -115,7 +115,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
* @return The aggregated value of the previous iteration.
*/
public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ return this.runtimeContext.getPreviousIterationAggregate(name);
}
/**
@@ -126,7 +126,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
* @return The broadcast data set.
*/
public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
+ return this.runtimeContext.getBroadcastVariable(name);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index 8d24f16..079b4c7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -39,13 +39,13 @@ import java.util.List;
public class GSAConfiguration extends IterationConfiguration {
/** the broadcast variables for the gather function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<Tuple2<String,DataSet<?>>>();
+ private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>();
/** the broadcast variables for the sum function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<Tuple2<String,DataSet<?>>>();
+ private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<>();
/** the broadcast variables for the apply function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>();
+ private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<>();
private EdgeDirection direction = EdgeDirection.OUT;
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index d914f2a..90db9da 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -94,7 +94,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
+ return this.runtimeContext.getIterationAggregator(name);
}
/**
@@ -104,7 +104,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
* @return The aggregated value of the previous iteration.
*/
public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ return this.runtimeContext.getPreviousIterationAggregate(name);
}
/**
@@ -115,7 +115,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
* @return The broadcast data set.
*/
public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
+ return this.runtimeContext.getBroadcastVariable(name);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index d1b12f9..442ce68 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.types.LongValue;
@@ -119,13 +118,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
// Prepare type information
TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
TypeInformation<M> messageType = TypeExtractor.createTypeInfo(gather, GatherFunction.class, gather.getClass(), 2);
- TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+ TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<>(keyType, messageType);
TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
- // create a graph
- Graph<K, VV, EV> graph =
- Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment());
-
// check whether the numVertices option is set and, if so, compute the total number of vertices
// and set it within the gather, sum and apply functions
@@ -139,9 +134,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
}
// Prepare UDFs
- GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
- SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
- ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
+ GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<>(gather, innerType);
+ SumUdf<K, VV, EV, M> sumUdf = new SumUdf<>(sum, innerType);
+ ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<>(apply, outputType);
final int[] zeroKeyPos = new int[] {0};
final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
@@ -268,11 +263,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
*
* @return An in stance of the gather-sum-apply graph computation operator.
*/
- public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
+ public static <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
- return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
+ return new GatherSumApplyIteration<>(gather, sum, apply, edges, maximumNumberOfIterations);
}
// --------------------------------------------------------------------------------------------
@@ -295,7 +290,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
@Override
public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
M result = this.gatherFunction.gather(neighborTuple.f1);
- return new Tuple2<K, M>(neighborTuple.f0, result);
+ return new Tuple2<>(neighborTuple.f0, result);
}
@Override
@@ -337,7 +332,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
K key = arg0.f0;
M result = this.sumFunction.sum(arg0.f1, arg1.f1);
- return new Tuple2<K, M>(key, result);
+ return new Tuple2<>(key, result);
}
@Override
@@ -411,8 +406,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
- out.collect(new Tuple2<K, Neighbor<VV, EV>>(
- edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+ out.collect(new Tuple2<>(
+ edge.getTarget(), new Neighbor<>(vertex.getValue(), edge.getValue())));
}
}
@@ -422,8 +417,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
- out.collect(new Tuple2<K, Neighbor<VV, EV>>(
- edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+ out.collect(new Tuple2<>(
+ edge.getSource(), new Neighbor<>(vertex.getValue(), edge.getValue())));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 68e8d27..e70af1f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -94,7 +94,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
+ return this.runtimeContext.getIterationAggregator(name);
}
/**
@@ -104,7 +104,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
* @return The aggregated value of the previous iteration.
*/
public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ return this.runtimeContext.getPreviousIterationAggregate(name);
}
/**
@@ -115,7 +115,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
* @return The broadcast data set.
*/
public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
+ return this.runtimeContext.getBroadcastVariable(name);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index f39d858..4656757 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -51,7 +51,7 @@ public class GSASingleSourceShortestPaths<K> implements
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
- return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+ return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
new UpdateDistance<K>(), maxIterations)
.getVertices();
@@ -85,7 +85,7 @@ public class GSASingleSourceShortestPaths<K> implements
public Double gather(Neighbor<Double, Double> neighbor) {
return neighbor.getNeighborValue() + neighbor.getEdgeValue();
}
- };
+ }
@SuppressWarnings("serial")
private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
@@ -93,7 +93,7 @@ public class GSASingleSourceShortestPaths<K> implements
public Double sum(Double newValue, Double currentValue) {
return Math.min(newValue, currentValue);
}
- };
+ }
@SuppressWarnings("serial")
private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 2d13dfd..96e5afc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -112,7 +112,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends GatherFunction<K, VV, VV> {
public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> inMessages) {
- Map<VV, Long> labelsWithFrequencies = new HashMap<VV, Long>();
+ Map<VV, Long> labelsWithFrequencies = new HashMap<>();
long maxFrequency = 1;
VV mostFrequentLabel = vertex.getValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 4ff4e79..5aa1ac1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -51,7 +51,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
- return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+ return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
.runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
maxIterations).getVertices();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 681d060..dabeb06 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -110,7 +110,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
private static final class DegreeCounter<K extends Comparable<K>, EV>
implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
- final ArrayList<K> otherVertices = new ArrayList<K>();
+ final ArrayList<K> otherVertices = new ArrayList<>();
final EdgeWithDegrees<K> outputEdge = new EdgeWithDegrees<>();
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
index db64f63..08c15e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -18,10 +18,6 @@
package org.apache.flink.graph.pregel;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.DataSet;
@@ -33,6 +29,10 @@ import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
/**
* The base class for the message-passing functions between vertices as a part of a {@link VertexCentricIteration}.
*
@@ -86,7 +86,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
*/
public final Iterable<Edge<K, EV>> getEdges() {
verifyEdgeUsage();
- this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+ this.edgeIterator.set(edges);
return this.edgeIterator;
}
@@ -100,7 +100,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
verifyEdgeUsage();
outMsg.setField(m, 1);
while (edges.hasNext()) {
- Tuple next = (Tuple) edges.next();
+ Tuple next = edges.next();
outMsg.setField(next.getField(1), 0);
out.collect(Either.Right(outMsg));
}
@@ -157,7 +157,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
* @return The aggregator registered under this name, or {@code null}, if no aggregator was registered.
*/
public final <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
+ return this.runtimeContext.getIterationAggregator(name);
}
/**
@@ -167,7 +167,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
* @return The aggregated value of the previous iteration.
*/
public final <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ return this.runtimeContext.getPreviousIterationAggregate(name);
}
/**
@@ -179,7 +179,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
* @return The broadcast data set.
*/
public final <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
+ return this.runtimeContext.getBroadcastVariable(name);
}
// --------------------------------------------------------------------------------------------
@@ -204,9 +204,9 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
- this.outVertex = new Vertex<K, VV>();
- this.outMsg = new Tuple2<K, Message>();
- this.edgeIterator = new EdgesIterator<K, EV>();
+ this.outVertex = new Vertex<>();
+ this.outMsg = new Tuple2<>();
+ this.edgeIterator = new EdgesIterator<>();
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
index 70c8262..9398d8d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -18,15 +18,15 @@
package org.apache.flink.graph.pregel;
-import java.io.Serializable;
-
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Either;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
+
/**
- * The base class for combining messages sent during a {@link VertexCentricteration}.
+ * The base class for combining messages sent during a {@link VertexCentricIteration}.
*
* @param <K> The type of the vertex id
* @param <Message> The type of the message sent between vertices along the edges.
@@ -37,15 +37,12 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
private Collector<Tuple2<K, Either<NullValue, Message>>> out;
- private K vertexId;
-
private Tuple2<K, Either<NullValue, Message>> outValue;
void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) {
- this.vertexId = target;
this.out = collector;
- this.outValue = new Tuple2<K, Either<NullValue, Message>>();
- outValue.setField(vertexId, 0);
+ this.outValue = new Tuple2<>();
+ outValue.setField(target, 0);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index 026e869..a0f793a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -37,7 +37,7 @@ import java.util.List;
public class VertexCentricConfiguration extends IterationConfiguration {
/** the broadcast variables for the compute function **/
- private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<Tuple2<String,DataSet<?>>>();
+ private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<>();
public VertexCentricConfiguration() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index 8d96776..ebf2e8d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -18,24 +18,21 @@
package org.apache.flink.graph.pregel;
-import java.util.Iterator;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -50,6 +47,9 @@ import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import java.util.Iterator;
+import java.util.Map;
+
/**
* This class represents iterative graph computations, programmed in a vertex-centric perspective.
* It is a special case of <i>Bulk Synchronous Parallel</i> computation. The paradigm has also been
@@ -158,14 +158,14 @@ public class VertexCentricIteration<K, VV, EV, Message>
// prepare the type information
TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
TypeInformation<Tuple2<K, Message>> messageTypeInfo =
- new TupleTypeInfo<Tuple2<K, Message>>(keyType, messageType);
+ new TupleTypeInfo<>(keyType, messageType);
TypeInformation<Vertex<K, VV>> vertexType = initialVertices.getType();
TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> intermediateTypeInfo =
- new EitherTypeInfo<Vertex<K, VV>, Tuple2<K, Message>>(vertexType, messageTypeInfo);
+ new EitherTypeInfo<>(vertexType, messageTypeInfo);
TypeInformation<Either<NullValue, Message>> nullableMsgTypeInfo =
- new EitherTypeInfo<NullValue, Message>(TypeExtractor.getForClass(NullValue.class), messageType);
+ new EitherTypeInfo<>(TypeExtractor.getForClass(NullValue.class), messageType);
TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo =
- new TupleTypeInfo<Tuple2<K, Either<NullValue, Message>>>(keyType, nullableMsgTypeInfo);
+ new TupleTypeInfo<>(keyType, nullableMsgTypeInfo);
DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = initialVertices.map(
new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo);
@@ -183,7 +183,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
vertexType, nullableMsgTypeInfo));
VertexComputeUdf<K, VV, EV, Message> vertexUdf =
- new VertexComputeUdf<K, VV, EV, Message>(computeFunction, intermediateTypeInfo);
+ new VertexComputeUdf<>(computeFunction, intermediateTypeInfo);
CoGroupOperator<?, ?, Either<Vertex<K, VV>, Tuple2<K, Message>>> superstepComputation =
verticesWithMsgs.coGroup(edgesWithValue)
@@ -204,7 +204,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
if (combineFunction != null) {
MessageCombinerUdf<K, Message> combinerUdf =
- new MessageCombinerUdf<K, Message>(combineFunction, workSetTypeInfo);
+ new MessageCombinerUdf<>(combineFunction, workSetTypeInfo);
DataSet<Tuple2<K, Either<NullValue, Message>>> combinedMessages = allMessages
.groupBy(0).reduceGroup(combinerUdf)
@@ -237,12 +237,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
*
* @return An instance of the vertex-centric graph computation operator.
*/
- public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
+ public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf,
int maximumNumberOfIterations) {
- return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, null,
- maximumNumberOfIterations);
+ return new VertexCentricIteration<>(cf, edgesWithValue, null,
+ maximumNumberOfIterations);
}
/**
@@ -260,12 +260,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
*
* @return An instance of the vertex-centric graph computation operator.
*/
- public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
+ public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf,
MessageCombiner<K, Message> mc, int maximumNumberOfIterations) {
- return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, mc,
- maximumNumberOfIterations);
+ return new VertexCentricIteration<>(cf, edgesWithValue, mc,
+ maximumNumberOfIterations);
}
/**
@@ -297,7 +297,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
@Override
public void open(Configuration parameters) {
- outTuple = new Tuple2<K, Either<NullValue, Message>>();
+ outTuple = new Tuple2<>();
nullMessage = Either.Left(NullValue.getInstance());
outTuple.setField(nullMessage, 1);
}
@@ -308,12 +308,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
}
}
- @SuppressWarnings("serial")
/**
* This coGroup class wraps the user-defined compute function.
* The first input holds a Tuple2 containing the vertex state and its inbox.
* The second input is an iterator of the out-going edges of this vertex.
*/
+ @SuppressWarnings("serial")
private static class VertexComputeUdf<K, VV, EV, Message> extends RichCoGroupFunction<
Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>,
Either<Vertex<K, VV>, Tuple2<K, Message>>>
@@ -469,8 +469,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>,
Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> {
- private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple =
- new Tuple2<Vertex<K, VV>, Either<NullValue, Message>>();
+ private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple = new Tuple2<>();
public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join(
Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message) {
@@ -498,7 +497,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
private static final class ProjectMessages<K, VV, Message> implements
FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> {
- private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<K, Either<NullValue, Message>>();
+ private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<>();
public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> value,
Collector<Tuple2<K, Either<NullValue, Message>>> out) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
index d56c0da..93b3a8c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -141,7 +141,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
+ return this.runtimeContext.getIterationAggregator(name);
}
/**
@@ -151,7 +151,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
* @return The aggregated value of the previous iteration.
*/
public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ return this.runtimeContext.getPreviousIterationAggregate(name);
}
/**
@@ -163,7 +163,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
* @return The broadcast data set.
*/
public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
+ return this.runtimeContext.getBroadcastVariable(name);
}
// --------------------------------------------------------------------------------------------
@@ -243,8 +243,8 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
<VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
MessageIterator<Message> inMessages) throws Exception {
- Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
- ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
+ Vertex<K, VV> vertex = new Vertex<>(vertexState.f0,
+ ((Tuple3<VV, Long, Long>) vertexState.getValue()).f0);
updateVertex(vertex, inMessages);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
index 336e73d..b99b5b7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -217,7 +217,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
* @return The aggregator registered under this name, or null, if no aggregator was registered.
*/
public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
+ return this.runtimeContext.getIterationAggregator(name);
}
/**
@@ -227,7 +227,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
* @return The aggregated value of the previous iteration.
*/
public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ return this.runtimeContext.getPreviousIterationAggregate(name);
}
/**
@@ -239,7 +239,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
* @return The broadcast data set.
*/
public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
+ return this.runtimeContext.getBroadcastVariable(name);
}
// --------------------------------------------------------------------------------------------
@@ -266,8 +266,8 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
- this.outValue = new Tuple2<K, Message>();
- this.edgeIterator = new EdgesIterator<K, EV>();
+ this.outValue = new Tuple2<>();
+ this.edgeIterator = new EdgesIterator<>();
}
void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
@@ -282,7 +282,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
{
private Iterator<Edge<K, EV>> input;
- private Edge<K, EV> edge = new Edge<K, EV>();
+ private Edge<K, EV> edge = new Edge<>();
void set(Iterator<Edge<K, EV>> input) {
this.input = input;
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index fde305f..8049932 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -196,7 +196,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
*
* @return An in stance of the scatter-gather graph computation operator.
*/
- public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
+ public static <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf,
GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations)
{
@@ -588,8 +588,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
throw new IllegalArgumentException("Illegal edge direction");
}
- GatherUdf<K, VV, Message> updateUdf =
- new GatherUdfSimpleVV<K, VV, Message>(gatherFunction, vertexTypes);
+ GatherUdf<K, VV, Message> updateUdf = new GatherUdfSimpleVV<>(gatherFunction, vertexTypes);
// build the update function (co group)
CoGroupOperator<?, ?, Vertex<K, VV>> updates =
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 33d469b..b620dd8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -50,8 +50,8 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV,
private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
- out.collect(new Tuple1<K>(edge.f0));
- out.collect(new Tuple1<K>(edge.f1));
+ out.collect(new Tuple1<>(edge.f0));
+ out.collect(new Tuple1<>(edge.f1));
}
}
@@ -67,7 +67,7 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV,
private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
public Tuple1<K> map(K key) throws Exception {
- return new Tuple1<K>(key);
+ return new Tuple1<>(key);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
index 0cf026c..ba6bd05 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -33,7 +33,6 @@ import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
import static org.junit.Assert.assertEquals;
public class TranslateTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 1e44d5b..98ecf16 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
@@ -240,10 +239,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test bcast variable
@SuppressWarnings("unchecked")
- List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("gatherBcastSet");
- Assert.assertEquals(1, bcastSet.get(0));
- Assert.assertEquals(2, bcastSet.get(1));
- Assert.assertEquals(3, bcastSet.get(2));
+ List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("gatherBcastSet");
+ Assert.assertEquals(1, bcastSet.get(0).intValue());
+ Assert.assertEquals(2, bcastSet.get(1).intValue());
+ Assert.assertEquals(3, bcastSet.get(2).intValue());
// test aggregator
if (getSuperstepNumber() == 2) {
@@ -271,10 +270,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test bcast variable
@SuppressWarnings("unchecked")
- List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("sumBcastSet");
- Assert.assertEquals(4, bcastSet.get(0));
- Assert.assertEquals(5, bcastSet.get(1));
- Assert.assertEquals(6, bcastSet.get(2));
+ List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("sumBcastSet");
+ Assert.assertEquals(4, bcastSet.get(0).intValue());
+ Assert.assertEquals(5, bcastSet.get(1).intValue());
+ Assert.assertEquals(6, bcastSet.get(2).intValue());
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
@@ -286,7 +285,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
public Long sum(Long newValue, Long currentValue) {
long superstep = getSuperstepNumber();
aggregator.aggregate(superstep);
- return 0l;
+ return 0L;
}
}
@@ -300,10 +299,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test bcast variable
@SuppressWarnings("unchecked")
- List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("applyBcastSet");
- Assert.assertEquals(7, bcastSet.get(0));
- Assert.assertEquals(8, bcastSet.get(1));
- Assert.assertEquals(9, bcastSet.get(2));
+ List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("applyBcastSet");
+ Assert.assertEquals(7, bcastSet.get(0).intValue());
+ Assert.assertEquals(8, bcastSet.get(1).intValue());
+ Assert.assertEquals(9, bcastSet.get(2).intValue());
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
@@ -338,7 +337,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
private static final class DummySum extends SumFunction<Long, Long, Long> {
public Long sum(Long newValue, Long currentValue) {
- return 0l;
+ return 0L;
}
}
@@ -354,7 +353,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
public Long map(Vertex<Long, Long> value) {
- return 1l;
+ return 1L;
}
}
@@ -363,7 +362,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
@Override
public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
- HashSet<Long> h = new HashSet<Long>();
+ HashSet<Long> h = new HashSet<>();
h.add(value.getId());
return h;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index fcd0d82..2213700 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
@@ -521,10 +520,10 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
// test bcast variable
@SuppressWarnings("unchecked")
- List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
- Assert.assertEquals(4, bcastSet.get(0));
- Assert.assertEquals(5, bcastSet.get(1));
- Assert.assertEquals(6, bcastSet.get(2));
+ List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("messagingBcastSet");
+ Assert.assertEquals(4, bcastSet.get(0).intValue());
+ Assert.assertEquals(5, bcastSet.get(1).intValue());
+ Assert.assertEquals(6, bcastSet.get(2).intValue());
// test number of vertices
Assert.assertEquals(5, getNumberOfVertices());
@@ -569,10 +568,10 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
// test bcast variable
@SuppressWarnings("unchecked")
- List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
- Assert.assertEquals(1, bcastSet.get(0));
- Assert.assertEquals(2, bcastSet.get(1));
- Assert.assertEquals(3, bcastSet.get(2));
+ List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("updateBcastSet");
+ Assert.assertEquals(1, bcastSet.get(0).intValue());
+ Assert.assertEquals(2, bcastSet.get(1).intValue());
+ Assert.assertEquals(3, bcastSet.get(2).intValue());
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
index 99c66ec..3ccdef0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.graph.test.operations;
-import com.google.common.base.Charsets;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileInputSplit;
@@ -30,10 +29,12 @@ import org.apache.flink.types.NullValue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
import java.util.List;
@RunWith(Parameterized.class)
@@ -193,7 +194,7 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
tempFile.deleteOnExit();
OutputStreamWriter wrt = new OutputStreamWriter(
- new FileOutputStream(tempFile), Charsets.UTF_8
+ new FileOutputStream(tempFile), Charset.forName("UTF-8")
);
wrt.write(content);
wrt.close();