You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:38 UTC
[06/10] flink git commit: [FLINK-1594] [streaming] Added
StreamGraphEdges
[FLINK-1594] [streaming] Added StreamGraphEdges
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29a66155
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29a66155
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29a66155
Branch: refs/heads/master
Commit: 29a661551539ad96d31b92999d088ede9186b9ba
Parents: 6b4ee2a
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Feb 26 13:16:32 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100
----------------------------------------------------------------------
.../apache/flink/streaming/api/StreamEdge.java | 72 ++++++++++++
.../flink/streaming/api/StreamEdgeList.java | 112 ++++++++++++++++++
.../flink/streaming/api/StreamEdgeListTest.java | 116 +++++++++++++++++++
3 files changed, 300 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/29a66155/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
new file mode 100644
index 0000000..8743233
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.List;
+
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+
+public class StreamEdge {
+
+ final private int sourceVertex;
+ final private int targetVertex;
+ final private int typeNumber;
+ final private List<String> selectedNames;
+// private OutputSelector<?> outputSelector;
+ final private StreamPartitioner<?> outputPartitioner;
+
+ public StreamEdge(int sourceVertex, int targetVertex, int typeNumber, List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
+ this.sourceVertex = sourceVertex;
+ this.targetVertex = targetVertex;
+ this.typeNumber = typeNumber;
+ this.selectedNames = selectedNames;
+// this.outputSelector = outputSelector;
+ this.outputPartitioner = outputPartitioner;
+ }
+
+ public int getSourceVertex() {
+ return sourceVertex;
+ }
+
+ public int getTargetVertex() {
+ return targetVertex;
+ }
+
+ public int getTypeNumber() {
+ return typeNumber;
+ }
+
+ public List<String> getSelectedNames() {
+ return selectedNames;
+ }
+
+ public StreamPartitioner<?> getOutputPartitioner() {
+ return outputPartitioner;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamGraphEdge{" +
+ "sourceVertex=" + sourceVertex +
+ ", targetVertex=" + targetVertex +
+ ", typeNumber=" + typeNumber +
+ ", selectedNames=" + selectedNames +
+ ", outputPartitioner=" + outputPartitioner +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a66155/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
new file mode 100644
index 0000000..85202ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class StreamEdgeList {
+
+ private Map<Integer, List<StreamEdge>> outEdgeLists;
+ private Map<Integer, List<StreamEdge>> inEdgeLists;
+
+ public StreamEdgeList() {
+ outEdgeLists = new HashMap<Integer, List<StreamEdge>>();
+ inEdgeLists = new HashMap<Integer, List<StreamEdge>>();
+ }
+
+ public void addVertex(int vertexId) {
+ outEdgeLists.put(vertexId, new ArrayList<StreamEdge>());
+ inEdgeLists.put(vertexId, new ArrayList<StreamEdge>());
+ }
+
+ public void removeVertex(int vertexId) {
+ ArrayList<StreamEdge> toRemove = new ArrayList<StreamEdge>();
+
+ for (StreamEdge edge : outEdgeLists.get(vertexId)) {
+ toRemove.add(edge);
+ }
+
+ for (StreamEdge edge : inEdgeLists.get(vertexId)) {
+ toRemove.add(edge);
+ }
+
+ for (StreamEdge edge : toRemove) {
+ removeEdge(edge);
+ }
+
+ outEdgeLists.remove(vertexId);
+ inEdgeLists.remove(vertexId);
+ }
+
+ public void addEdge(StreamEdge edge) {
+ int sourceId = edge.getSourceVertex();
+ int targetId = edge.getTargetVertex();
+ outEdgeLists.get(sourceId).add(edge);
+ inEdgeLists.get(targetId).add(edge);
+ }
+
+ public void removeEdge(StreamEdge edge) {
+ int sourceId = edge.getSourceVertex();
+ int targetId = edge.getTargetVertex();
+ removeEdge(sourceId, targetId);
+ }
+
+ public void removeEdge(int sourceId, int targetId) {
+ Iterator<StreamEdge> outIterator = outEdgeLists.get(sourceId).iterator();
+ while (outIterator.hasNext()) {
+ StreamEdge edge = outIterator.next();
+
+ if (edge.getTargetVertex() == targetId) {
+ outIterator.remove();
+ }
+ }
+
+ Iterator<StreamEdge> inIterator = inEdgeLists.get(targetId).iterator();
+ while (inIterator.hasNext()) {
+ StreamEdge edge = inIterator.next();
+
+ if (edge.getSourceVertex() == sourceId) {
+ inIterator.remove();
+ }
+ }
+ }
+
+ public List<StreamEdge> getOutEdges(int i) {
+ List<StreamEdge> outEdges = outEdgeLists.get(i);
+
+ if (outEdges == null) {
+ throw new RuntimeException("No such vertex in stream graph: " + i);
+ }
+
+ return outEdges;
+ }
+
+ public List<StreamEdge> getInEdges(int i) {
+ List<StreamEdge> inEdges = inEdgeLists.get(i);
+
+ if (inEdges == null) {
+ throw new RuntimeException("No such vertex in stream graph: " + i);
+ }
+
+ return inEdges;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/29a66155/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java
new file mode 100644
index 0000000..c4ba987
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamEdgeListTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class StreamEdgeListTest {
+
+ private StreamEdgeList edgeList;
+
+ @Before
+ public void init() {
+ edgeList = new StreamEdgeList();
+ }
+
+ @Test
+ public void test() {
+ edgeList.addVertex(1);
+ edgeList.addVertex(2);
+ edgeList.addVertex(3);
+
+
+ // add edges
+ StreamEdge edge1 = new StreamEdge(1, 2, -1, null, null);
+ StreamEdge edge2 = new StreamEdge(2, 3, -1, null, null);
+ StreamEdge edge3 = new StreamEdge(1, 3, -1, null, null);
+
+ edgeList.addEdge(edge1);
+ edgeList.addEdge(edge2);
+ edgeList.addEdge(edge3);
+
+ // check adding
+ checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge1, edge3));
+ checkIfSameElements(edgeList.getOutEdges(2), Arrays.asList(edge2));
+ checkIfSameElements(edgeList.getOutEdges(3), new ArrayList<StreamEdge>());
+
+ checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+ checkIfSameElements(edgeList.getInEdges(2), Arrays.asList(edge1));
+ checkIfSameElements(edgeList.getInEdges(3), Arrays.asList(edge2, edge3));
+
+ // add duplicate edges
+ StreamEdge edge1new = new StreamEdge(1, 2, -2, null, null);
+ StreamEdge edge2new = new StreamEdge(2, 3, -2, null, null);
+
+ edgeList.addEdge(edge1new);
+ edgeList.addEdge(edge2new);
+
+ // check adding
+ checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge1, edge1new, edge3));
+ checkIfSameElements(edgeList.getOutEdges(2), Arrays.asList(edge2, edge2new));
+ checkIfSameElements(edgeList.getOutEdges(3), new ArrayList<StreamEdge>());
+
+ checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+ checkIfSameElements(edgeList.getInEdges(2), Arrays.asList(edge1, edge1new));
+ checkIfSameElements(edgeList.getInEdges(3), Arrays.asList(edge2, edge2new, edge3));
+
+ // remove a duplicate edge
+ edgeList.removeEdge(1, 2);
+
+ // check removing
+ checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge3));
+ checkIfSameElements(edgeList.getOutEdges(2), Arrays.asList(edge2, edge2new));
+
+ checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+ checkIfSameElements(edgeList.getInEdges(2), new ArrayList<StreamEdge>());
+
+ // add back an edge and delete a vertex
+ edgeList.addEdge(edge1);
+ edgeList.removeVertex(2);
+
+ // check removing
+ checkIfSameElements(edgeList.getOutEdges(1), Arrays.asList(edge3));
+ try {
+ checkIfSameElements(edgeList.getOutEdges(2), null);
+ fail();
+ } catch (RuntimeException e) {
+ }
+ checkIfSameElements(edgeList.getOutEdges(3), new ArrayList<StreamEdge>());
+
+ checkIfSameElements(edgeList.getInEdges(1), new ArrayList<StreamEdge>());
+ try {
+ checkIfSameElements(edgeList.getInEdges(2), null);
+ fail();
+ } catch (RuntimeException e) {
+ }
+ checkIfSameElements(edgeList.getInEdges(3), Arrays.asList(edge3));
+ }
+
+ private <T> void checkIfSameElements(List<T> expected, List<T> result) {
+ assertEquals(new HashSet<T>(expected), new HashSet<T>(result));
+ }
+}
\ No newline at end of file