You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:38 UTC

[06/10] flink git commit: [FLINK-1594] [streaming] Added StreamGraphEdges

[FLINK-1594] [streaming] Added StreamGraphEdges


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29a66155
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29a66155
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29a66155

Branch: refs/heads/master
Commit: 29a661551539ad96d31b92999d088ede9186b9ba
Parents: 6b4ee2a
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Feb 26 13:16:32 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamEdge.java  |  72 ++++++++++++
 .../flink/streaming/api/StreamEdgeList.java     | 112 ++++++++++++++++++
 .../flink/streaming/api/StreamEdgeListTest.java | 116 +++++++++++++++++++
 3 files changed, 300 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29a66155/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
new file mode 100644
index 0000000..8743233
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.List;
+
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+
+public class StreamEdge {
+
+	final private int sourceVertex;
+	final private int targetVertex;
+	final private int typeNumber;
+	final private List<String> selectedNames;
+//	private OutputSelector<?> outputSelector;
+	final private StreamPartitioner<?> outputPartitioner;
+
+	public StreamEdge(int sourceVertex, int targetVertex, int typeNumber, List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
+		this.sourceVertex = sourceVertex;
+		this.targetVertex = targetVertex;
+		this.typeNumber = typeNumber;
+		this.selectedNames = selectedNames;
+//		this.outputSelector = outputSelector;
+		this.outputPartitioner = outputPartitioner;
+	}
+
+	public int getSourceVertex() {
+		return sourceVertex;
+	}
+
+	public int getTargetVertex() {
+		return targetVertex;
+	}
+
+	public int getTypeNumber() {
+		return typeNumber;
+	}
+
+	public List<String> getSelectedNames() {
+		return selectedNames;
+	}
+
+	public StreamPartitioner<?> getOutputPartitioner() {
+		return outputPartitioner;
+	}
+
+	@Override
+	public String toString() {
+		return "StreamGraphEdge{" +
+				"sourceVertex=" + sourceVertex +
+				", targetVertex=" + targetVertex +
+				", typeNumber=" + typeNumber +
+				", selectedNames=" + selectedNames +
+				", outputPartitioner=" + outputPartitioner +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a66155/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
new file mode 100644
index 0000000..85202ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class StreamEdgeList {
+
+	private Map<Integer, List<StreamEdge>> outEdgeLists;
+	private Map<Integer, List<StreamEdge>> inEdgeLists;
+
+	public StreamEdgeList() {
+		outEdgeLists = new HashMap<Integer, List<StreamEdge>>();
+		inEdgeLists = new HashMap<Integer, List<StreamEdge>>();
+	}
+
+	public void addVertex(int vertexId) {
+		outEdgeLists.put(vertexId, new ArrayList<StreamEdge>());
+		inEdgeLists.put(vertexId, new ArrayList<StreamEdge>());
+	}
+
+	public void removeVertex(int vertexId) {
+		ArrayList<StreamEdge> toRemove = new ArrayList<StreamEdge>();
+
+		for (StreamEdge edge : outEdgeLists.get(vertexId)) {
+			toRemove.add(edge);
+		}
+
+		for (StreamEdge edge : inEdgeLists.get(vertexId)) {
+			toRemove.add(edge);
+		}
+
+		for (StreamEdge edge : toRemove) {
+			removeEdge(edge);
+		}
+
+		outEdgeLists.remove(vertexId);
+		inEdgeLists.remove(vertexId);
+	}
+
+	public void addEdge(StreamEdge edge) {
+		int sourceId = edge.getSourceVertex();
+		int targetId = edge.getTargetVertex();
+		outEdgeLists.get(sourceId).add(edge);
+		inEdgeLists.get(targetId).add(edge);
+	}
+
+	public void removeEdge(StreamEdge edge) {
+		int sourceId = edge.getSourceVertex();
+		int targetId = edge.getTargetVertex();
+		removeEdge(sourceId, targetId);
+	}
+
+	public void removeEdge(int sourceId, int targetId) {
+		Iterator<StreamEdge> outIterator = outEdgeLists.get(sourceId).iterator();
+		while (outIterator.hasNext()) {
+			StreamEdge edge = outIterator.next();
+
+			if (edge.getTargetVertex() == targetId) {
+				outIterator.remove();
+			}
+		}
+
+		Iterator<StreamEdge> inIterator = inEdgeLists.get(targetId).iterator();
+		while (inIterator.hasNext()) {
+			StreamEdge edge = inIterator.next();
+
+			if (edge.getSourceVertex() == sourceId) {
+				inIterator.remove();
+			}
+		}
+	}
+
+	public List<StreamEdge> getOutEdges(int i) {
+		List<StreamEdge> outEdges = outEdgeLists.get(i);
+
+		if (outEdges == null) {
+			throw new RuntimeException("No such vertex in stream graph: " + i);
+		}
+
+		return outEdges;
+	}
+
+	public List<StreamEdge> getInEdges(int i) {
+		List<StreamEdge> inEdges = inEdgeLists.get(i);
+
+		if (inEdges == null) {
+			throw new RuntimeException("No such vertex in stream graph: " + i);
+		}
+
+		return inEdges;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/29a66155/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java
new file mode 100644
index 0000000..c4ba987
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class StreamEdgeListTest {
+
+	private StreamEdgeList edgeList;
+
+	@Before
+	public void init() {
+		edgeList = new StreamEdgeList();
+	}
+
+	@Test
+	public void test() {
+		edgeList.addVertex(1);
+		edgeList.addVertex(2);
+		edgeList.addVertex(3);
+
+
+		// add edges
+		StreamEdge edge1 = new StreamEdge(1, 2, -1, null, null);
+		StreamEdge edge2 = new StreamEdge(2, 3, -1, null, null);
+		StreamEdge edge3 = new StreamEdge(1, 3, -1, null, null);
+
+		edgeList.addEdge(edge1);
+		edgeList.addEdge(edge2);
+		edgeList.addEdge(edge3);
+
+		// check adding
+		checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge1, edge3));
+		checkIfSameElements(edgeList.getOutEdges(2), Arrays.asList(edge2));
+		checkIfSameElements(edgeList.getOutEdges(3), new ArrayList<StreamEdge>());
+
+		checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+		checkIfSameElements(edgeList.getInEdges(2), Arrays.asList(edge1));
+		checkIfSameElements(edgeList.getInEdges(3), Arrays.asList(edge2, edge3));
+
+		// add duplicate edges
+		StreamEdge edge1new = new StreamEdge(1, 2, -2, null, null);
+		StreamEdge edge2new = new StreamEdge(2, 3, -2, null, null);
+
+		edgeList.addEdge(edge1new);
+		edgeList.addEdge(edge2new);
+
+		// check adding
+		checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge1, edge1new, edge3));
+		checkIfSameElements(edgeList.getOutEdges(2), Arrays.asList(edge2, edge2new));
+		checkIfSameElements(edgeList.getOutEdges(3), new ArrayList<StreamEdge>());
+
+		checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+		checkIfSameElements(edgeList.getInEdges(2), Arrays.asList(edge1, edge1new));
+		checkIfSameElements(edgeList.getInEdges(3), Arrays.asList(edge2, edge2new, edge3));
+
+		// remove a duplicate edge
+		edgeList.removeEdge(1, 2);
+
+		// check removing
+		checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge3));
+		checkIfSameElements(edgeList.getOutEdges(2), Arrays.asList(edge2, edge2new));
+
+		checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+		checkIfSameElements(edgeList.getInEdges(2), new ArrayList<StreamEdge>());
+
+		// add back an edge and delete a vertex
+		edgeList.addEdge(edge1);
+		edgeList.removeVertex(2);
+
+		// check removing
+		checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge3));
+		try {
+			checkIfSameElements(edgeList.getOutEdges(2), null);
+			fail();
+		} catch (RuntimeException e) {
+		}
+		checkIfSameElements(edgeList.getOutEdges(3), new ArrayList<StreamEdge>());
+
+		checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+		try {
+			checkIfSameElements(edgeList.getInEdges(2), null);
+			fail();
+		} catch (RuntimeException e) {
+		}
+		checkIfSameElements(edgeList.getInEdges(3), Arrays.asList(edge3));
+	}
+
+	private <T> void checkIfSameElements(List<T> expected, List<T> result) {
+		assertEquals(new HashSet<T>(expected), new HashSet<T>(result));
+	}
+}
\ No newline at end of file