You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/09/18 13:37:01 UTC

incubator-gearpump git commit: [GEARPUMP-349] Optimize Graph topologicalOrderIterator performance

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 55d6d2f56 -> 175b08e64


[GEARPUMP-349] Optimize Graph topologicalOrderIterator performance

Author: huafengw <fv...@gmail.com>

Closes #223 from huafengw/graph.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/175b08e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/175b08e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/175b08e6

Branch: refs/heads/master
Commit: 175b08e64c3363a2da9c61e05dac5f01d1f8db2d
Parents: 55d6d2f
Author: huafengw <fv...@gmail.com>
Authored: Mon Sep 18 21:36:08 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Sep 18 21:36:13 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/util/Graph.scala  | 190 +++++++++++--------
 .../org/apache/gearpump/util/GraphSpec.scala    |   6 +-
 .../experiments/pagerank/PageRankWorker.scala   |   4 +-
 .../akkastream/graph/GraphPartitioner.scala     |   8 +-
 .../materializer/LocalMaterializerImpl.scala    |   8 +-
 .../materializer/RemoteMaterializerImpl.scala   |   4 +-
 .../storm/util/GraphBuilderSpec.scala           |   4 +-
 .../gearpump/services/util/UpickleSpec.scala    |   4 +-
 .../org/apache/gearpump/streaming/DAG.scala     |   2 +-
 .../gearpump/streaming/StreamApplication.scala  |   6 +-
 .../streaming/appmaster/ClockService.scala      |   2 +-
 .../gearpump/streaming/dsl/plan/Planner.scala   |   2 +-
 .../org/apache/gearpump/streaming/DAGSpec.scala |   2 +-
 .../streaming/dsl/plan/PlannerSpec.scala        |   2 +-
 .../streaming/dsl/scalaapi/StreamAppSpec.scala  |   4 +-
 .../streaming/dsl/scalaapi/StreamSpec.scala     |   6 +-
 16 files changed, 139 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/core/src/main/scala/org/apache/gearpump/util/Graph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
index f110f5f..5b48050 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Graph.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
@@ -17,31 +17,34 @@
  */
 
 package org.apache.gearpump.util
-import scala.annotation.tailrec
+
 import scala.collection.mutable
 import scala.language.implicitConversions
+import scala.util.{Failure, Success, Try}
 
 /**
  * Generic mutable Graph libraries.
  */
 class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable {
-
-  private val _vertices = mutable.Set.empty[N]
-  private val _edges = mutable.Set.empty[(N, E, N)]
+  private val LOG = LogUtil.getLogger(getClass)
+  private val vertices = mutable.Set.empty[N]
+  private val edges = mutable.Set.empty[(N, E, N)]
+  private val outEdges = mutable.Map.empty[N, mutable.Set[(N, E, N)]]
+  private val inEdges = mutable.Map.empty[N, mutable.Set[(N, E, N)]]
 
   // This is used to ensure the output of this Graph is always stable
-  // Like method vertices(), or edges()
-  private var _indexs = Map.empty[Any, Int]
-  private var _nextIndex = 0
+  // Like method getVertices(), or getEdges()
+  private var indexs = Map.empty[Any, Int]
+  private var nextIndex = 0
   private def nextId: Int = {
-    val result = _nextIndex
-    _nextIndex += 1
+    val result = nextIndex
+    nextIndex += 1
     result
   }
 
   private def init(): Unit = {
-    Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_))
-    Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_))
+    Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex)
+    Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge)
   }
 
   init()
@@ -51,20 +54,22 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * Current Graph is changed.
    */
   def addVertex(vertex: N): Unit = {
-    val result = _vertices.add(vertex)
+    val result = vertices.add(vertex)
     if (result) {
-      _indexs += vertex -> nextId
+      indexs += vertex -> nextId
     }
   }
 
   /**
-   * Add a edge
+   * Add an edge
    * Current Graph is changed.
    */
   def addEdge(edge: (N, E, N)): Unit = {
-    val result = _edges.add(edge)
+    val result = edges.add(edge)
     if (result) {
-      _indexs += edge -> nextId
+      indexs += edge -> nextId
+      outEdges += edge._1 -> (outgoingEdgesOf(edge._1) + edge)
+      inEdges += edge._3 -> (incomingEdgesOf(edge._3) + edge)
     }
   }
 
@@ -72,37 +77,44 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * return all vertices.
    * The result is stable
    */
-  def vertices: List[N] = {
+  def getVertices: List[N] = {
     // Sorts the vertex so that we can keep the order for mapVertex
-    _vertices.toList.sortBy(_indexs(_))
+    vertices.toList.sortBy(indexs(_))
   }
 
   /**
    * out degree
    */
   def outDegreeOf(node: N): Int = {
-    edges.count(_._1 == node)
+    outgoingEdgesOf(node).size
   }
 
   /**
    * in degree
    */
   def inDegreeOf(node: N): Int = {
-    edges.count(_._3 == node)
+    incomingEdgesOf(node).size
   }
 
   /**
    * out going edges.
    */
-  def outgoingEdgesOf(node: N): List[(N, E, N)] = {
-    edges.filter(_._1 == node)
+  def outgoingEdgesOf(node: N): mutable.Set[(N, E, N)] = {
+    outEdges.getOrElse(node, mutable.Set.empty)
   }
 
   /**
    * incoming edges.
    */
-  def incomingEdgesOf(node: N): List[(N, E, N)] = {
-    edges.filter(_._3 == node)
+  def incomingEdgesOf(node: N): mutable.Set[(N, E, N)] = {
+    inEdges.getOrElse(node, mutable.Set.empty)
+  }
+
+  /**
+   * adjacent vertices.
+   */
+  private def adjacentVertices(node: N): List[N] = {
+    outgoingEdgesOf(node).map(_._3).toList
   }
 
   /**
@@ -110,10 +122,12 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * Current Graph is changed.
    */
   def removeVertex(node: N): Unit = {
-    _vertices.remove(node)
-    _indexs -= node
+    vertices.remove(node)
+    indexs -= node
     val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node)
-    toBeRemoved.foreach(removeEdge(_))
+    toBeRemoved.foreach(removeEdge)
+    outEdges -= node
+    inEdges -= node
   }
 
   /**
@@ -121,8 +135,10 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * Current Graph is changed.
    */
   private def removeEdge(edge: (N, E, N)): Unit = {
-    _indexs -= edge
-    _edges.remove(edge)
+    indexs -= edge
+    edges.remove(edge)
+    inEdges.update(edge._3, inEdges(edge._3) - edge)
+    outEdges.update(edge._1, outEdges(edge._1) - edge)
   }
 
   /**
@@ -140,14 +156,14 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * Current Graph is not changed.
    */
   def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = {
-    val vertexes = vertices.map(node => (node, fun(node)))
+    val newVertices = getVertices.map(node => (node, fun(node)))
 
-    val vertexMap: Map[N, NewNode] = vertexes.toMap
+    val vertexMap: Map[N, NewNode] = newVertices.toMap
 
-    val newEdges = edges.map { edge =>
+    val newEdges = getEdges.map { edge =>
       (vertexMap(edge._1), edge._2, vertexMap(edge._3))
     }
-    new Graph(vertexes.map(_._2), newEdges)
+    new Graph(newVertices.map(_._2), newEdges)
   }
 
   /**
@@ -155,24 +171,25 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * Current graph is not changed.
    */
   def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = {
-    val newEdges = edges.map { edge =>
+    val newEdges = getEdges.map { edge =>
       (edge._1, fun(edge._1, edge._2, edge._3), edge._3)
     }
-    new Graph(vertices, newEdges)
+    new Graph(getVertices, newEdges)
   }
 
   /**
    * edges connected to node
    */
   def edgesOf(node: N): List[(N, E, N)] = {
-    (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_))
+    (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toList
   }
 
   /**
    * all edges
    */
-  def edges: List[(N, E, N)] = {
-    _edges.toList.sortBy(_indexs(_))
+  def getEdges: List[(N, E, N)] = {
+    // Sorts the edges so that we can keep the order for mapEdges
+    edges.toList.sortBy(indexs(_))
   }
 
   /**
@@ -180,8 +197,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * Current graph is changed.
    */
   def addGraph(other: Graph[N, E]): Graph[N, E] = {
-    (vertices ++ other.vertices).foreach(addVertex(_))
-    (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
+    (getVertices ++ other.getVertices).foreach(addVertex)
+    (getEdges ++ other.getEdges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
     this
   }
 
@@ -189,15 +206,15 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * clone the graph
    */
   def copy: Graph[N, E] = {
-    new Graph(vertices, edges)
+    new Graph(getVertices, getEdges)
   }
 
   /**
    * check empty
    */
   def isEmpty: Boolean = {
-    val vertexCount = vertices.size
-    val edgeCount = edges.length
+    val vertexCount = getVertices.size
+    val edgeCount = getEdges.length
     if (vertexCount + edgeCount == 0) {
       true
     } else {
@@ -233,8 +250,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
   }
 
   private def removeZeroInDegree: List[N] = {
-    val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_))
-    toBeRemoved.foreach(removeVertex(_))
+    val toBeRemoved = getVertices.filter(inDegreeOf(_) == 0)
+    toBeRemoved.foreach(removeVertex)
     toBeRemoved
   }
 
@@ -243,13 +260,38 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * The node returned by Iterator is stable sorted.
    */
   def topologicalOrderIterator: Iterator[N] = {
-    val newGraph = copy
-    var output = List.empty[N]
+    tryTopologicalOrderIterator match {
+      case Success(iterator) => iterator
+      case Failure(_) =>
+        LOG.warn("Please note this graph is cyclic.")
+        topologicalOrderWithCirclesIterator
+    }
+  }
 
-    while (!newGraph.isEmpty) {
-      output ++= newGraph.removeZeroInDegree
+  private def tryTopologicalOrderIterator: Try[Iterator[N]] = {
+    Try {
+      var indegreeMap = getVertices.map(v => v -> inDegreeOf(v)).toMap
+
+      val verticesWithZeroIndegree = mutable.Queue(indegreeMap.filter(_._2 == 0).keys
+        .toList.sortBy(indexs(_)): _*)
+      var output = List.empty[N]
+      var count = 0
+      while (verticesWithZeroIndegree.nonEmpty) {
+        val vertice = verticesWithZeroIndegree.dequeue()
+        adjacentVertices(vertice).foreach { adjacentV =>
+          indegreeMap += adjacentV -> (indegreeMap(adjacentV) - 1)
+          if (indegreeMap(adjacentV) == 0) {
+            verticesWithZeroIndegree.enqueue(adjacentV)
+          }
+        }
+        output :+= vertice
+        count += 1
+      }
+      if (count != getVertices.size) {
+        throw new Exception("There exists a cycle in the graph")
+      }
+      output.iterator
     }
-    output.iterator
   }
 
   /**
@@ -278,18 +320,18 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
         edge => {
           if (!indexMap.contains(edge._3)) {
             tarjan(edge._3)
-            if (lowLink.get(edge._3).get < lowLink.get(node).get) {
+            if (lowLink(edge._3) < lowLink(node)) {
               lowLink(node) = lowLink(edge._3)
             }
           } else {
-            if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) {
+            if (inStack(edge._3) && (indexMap(edge._3) < lowLink(node))) {
               lowLink(node) = indexMap(edge._3)
             }
           }
         }
       }
 
-      if (indexMap.get(node).get == lowLink.get(node).get) {
+      if (indexMap(node) == lowLink(node)) {
         val circle = mutable.MutableList.empty[N]
         var n = node
         do {
@@ -301,7 +343,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
       }
     }
 
-    vertices.foreach {
+    getVertices.foreach {
       node => {
         if (!indexMap.contains(node)) tarjan(node)
       }
@@ -318,12 +360,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * http://www.drdobbs.com/database/topological-sorting/184410262
    */
   def topologicalOrderWithCirclesIterator: Iterator[N] = {
-    if (hasCycle()) {
-      val topo = getAcyclicCopy().topologicalOrderIterator
-      topo.flatMap(_.sortBy(_indexs(_)).iterator)
-    } else {
-      topologicalOrderIterator
-    }
+    val topo = getAcyclicCopy().topologicalOrderIterator
+    topo.flatMap(_.sortBy(indexs(_)).iterator)
   }
 
   private def getAcyclicCopy(): Graph[mutable.MutableList[N], E] = {
@@ -337,13 +375,13 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
 
     for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield {
       for (node1 <- circle1; node2 <- circle2) yield {
-        var edges = outgoingEdgesOf(node1)
-        for (edge <- edges; if edge._3 == node2) yield {
+        var outgoingEdges = outgoingEdgesOf(node1)
+        for (edge <- outgoingEdges; if edge._3 == node2) yield {
           newGraph.addEdge(circle1, edge._2, circle2)
         }
 
-        edges = outgoingEdgesOf(node2)
-        for (edge <- edges; if edge._3 == node1) yield {
+        outgoingEdges = outgoingEdgesOf(node2)
+        for (edge <- outgoingEdges; if edge._3 == node1) yield {
           newGraph.addEdge(circle2, edge._2, circle1)
         }
       }
@@ -355,26 +393,14 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
    * check whether there is a loop
    */
   def hasCycle(): Boolean = {
-    @tailrec
-    def detectCycle(graph: Graph[N, E]): Boolean = {
-      if (graph.edges.isEmpty) {
-        false
-      } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
-        true
-      } else {
-        graph.removeZeroInDegree
-        detectCycle(graph)
-      }
-    }
-
-    detectCycle(copy)
+    tryTopologicalOrderIterator.isFailure
   }
 
   /**
    * Check whether there are two edges connecting two nodes.
    */
   def hasDuplicatedEdge(): Boolean = {
-    edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
+    getEdges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
   }
 
   /**
@@ -391,7 +417,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
       val toBeRemovedLists = newGraph.removeZeroInDegree
       val maxLength = toBeRemovedLists.map(_.length).max
       for (subGraph <- toBeRemovedLists) {
-        val sorted = subGraph.sortBy(_indexs)
+        val sorted = subGraph.sortBy(indexs)
         for (i <- sorted.indices) {
           output += sorted(i) -> (level + i)
         }
@@ -402,8 +428,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial
   }
 
   override def toString: String = {
-    Map("vertices" -> vertices.mkString(","),
-      "edges" -> edges.mkString(",")).toString()
+    Map("vertices" -> getVertices.mkString(","),
+      "edges" -> getEdges.mkString(",")).toString()
   }
 }
 
@@ -436,7 +462,7 @@ object Graph {
   }
 
   def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = {
-    Some((graph.vertices, graph.edges))
+    Some((graph.getVertices, graph.getEdges))
   }
 
   def empty[N, E]: Graph[N, E] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
index 6663513..256ac9a 100644
--- a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
@@ -34,7 +34,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
   property("Graph with no edges should be built correctly") {
     val vertexSet = Set("A", "B", "C")
     val graph = Graph(vertexSet.toSeq.map(Node): _*)
-    graph.vertices.toSet shouldBe vertexSet
+    graph.getVertices.toSet shouldBe vertexSet
   }
 
   property("Graph with vertices and edges should be built correctly") {
@@ -67,7 +67,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
     }
 
     val graph: Graph[Vertex, Edge] = Graph(graphElements: _*)
-    graph.vertices should contain theSameElementsAs vertices
+    graph.getVertices should contain theSameElementsAs vertices
 
     0.until(vertices.size).foreach { i =>
       val v = vertices(i)
@@ -129,7 +129,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
     val newGraph = graph.copy
     newGraph.addVertex("C")
 
-    assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable")
+    assert(!graph.getVertices.toSet.contains("C"), "Graph should be immutable")
   }
 
   property("subGraph should return a sub-graph for certain vertex") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
index e033bf1..2f0bbf2 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
@@ -37,13 +37,13 @@ class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(ta
 
   private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get
 
-  private val node = graph.vertices.find { node =>
+  private val node = graph.getVertices.find { node =>
     node.taskId == taskContext.taskId.index
   }.get
 
   private val downstream = graph.outgoingEdgesOf(node).map(_._3)
     .map(id => taskId.copy(index = id.taskId)).toSeq
-  private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
+  private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).size
 
   LOG.info(s"downstream nodes: $downstream")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
index d764331..ed5a10d 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -69,7 +69,7 @@ class GraphPartitioner(strategy: Strategy) {
     val local = Graph.empty[Module, Edge]
     val remote = Graph.empty[Module, Edge]
 
-    graph.vertices.foreach{ module =>
+    graph.getVertices.foreach{ module =>
       if (tags(module) == Local) {
         local.addVertex(module)
       } else {
@@ -77,7 +77,7 @@ class GraphPartitioner(strategy: Strategy) {
       }
     }
 
-    graph.edges.foreach{ nodeEdgeNode =>
+    graph.getEdges.foreach{ nodeEdgeNode =>
       val (node1, edge, node2) = nodeEdgeNode
       (tags(node1), tags(node2)) match {
         case (Local, Local) =>
@@ -115,14 +115,14 @@ class GraphPartitioner(strategy: Strategy) {
   }
 
   private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
-    graph.vertices.map{vertex =>
+    graph.getVertices.map{ vertex =>
       vertex -> strategy.apply(vertex)
     }.toMap
   }
 
   private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
     val graph = inputGraph.copy
-    val dummies = graph.vertices.filter {module =>
+    val dummies = graph.getVertices.filter { module =>
       module match {
         case dummy: DummyModule =>
           true

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
index 477f4d3..7d07ca8 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -210,10 +210,10 @@ case class LocalMaterializerImpl (
 
   def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
     var moduleInProgress: Module = EmptyModule
-    graph.vertices.foreach(module => {
+    graph.getVertices.foreach(module => {
       moduleInProgress = moduleInProgress.compose(module)
     })
-    graph.edges.foreach(value => {
+    graph.getEdges.foreach(value => {
       val (node1, edge, node2) = value
       moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
     })
@@ -232,7 +232,7 @@ case class LocalMaterializerImpl (
       session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
       matV.get(module)
     }
-    materializedGraph.edges.foreach { nodeEdgeNode =>
+    materializedGraph.getEdges.foreach { nodeEdgeNode =>
       val (node1, edge, node2) = nodeEdgeNode
       val from = edge.from
       val to = edge.to
@@ -248,7 +248,7 @@ case class LocalMaterializerImpl (
         case _ =>
       }
     }
-    val matValSources = graph.vertices.flatMap(module => {
+    val matValSources = graph.getVertices.flatMap(module => {
       val rt: Option[MaterializedValueSource[_]] = module match {
         case graphStage: GraphStageModule =>
           graphStage.stage match {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index a62b8e3..a2a5185 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -86,7 +86,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
 
   private def junctionConfig(processorIds: Map[Module, ProcessorId]):
   Map[ProcessorId, UserConfig] = {
-    val updatedConfigs = graph.vertices.flatMap { vertex =>
+    val updatedConfigs = graph.getVertices.flatMap { vertex =>
       buildShape(vertex, processorIds)
     }.toMap
     updatedConfigs
@@ -119,7 +119,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
   Map[Module, ProcessorId] = {
     ids.flatMap { kv =>
       val (module, id) = kv
-      val processorId = app.dag.vertices.find { processor =>
+      val processorId = app.dag.getVertices.find { processor =>
         processor.taskConf.getString(id).isDefined
       }.map(_.id)
       processorId.map((module, _))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
index 9cf5009..fb89268 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
@@ -45,8 +45,8 @@ class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
 
       val graph = GraphBuilder.build(topology)
 
-      graph.edges.size shouldBe 1
-      val (from, edge, to) = graph.edges.head
+      graph.getEdges.size shouldBe 1
+      val (from, edge, to) = graph.getEdges.head
       from shouldBe sourceProcessor
       edge shouldBe a[StormPartitioner]
       to shouldBe targetProcessor

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
index 2bf6b6b..c5b3990 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
@@ -43,8 +43,8 @@ class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 
     val deserialized = read[Graph[Int, String]](serialized)
 
-    graph.vertices.toSet shouldBe deserialized.vertices.toSet
-    graph.edges.toSet shouldBe deserialized.edges.toSet
+    graph.getVertices.toSet shouldBe deserialized.getVertices.toSet
+    graph.getEdges.toSet shouldBe deserialized.getEdges.toSet
   }
 
   "MetricType" should "be able to serialize/deserialize correctly" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
index 8ad74f8..c43af2f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
@@ -49,7 +49,7 @@ case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription]
 
 object DAG {
   def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = {
-    val processors = graph.vertices.map { processorDescription =>
+    val processors = graph.getVertices.map { processorDescription =>
       (processorDescription.id, processorDescription)
     }.toMap
     val dag = graph.mapVertex { processor =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index f15e1b3..60a3897 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -150,11 +150,7 @@ object StreamApplication {
       name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
     import org.apache.gearpump.streaming.Processor._
 
-    if (dag.hasCycle()) {
-      LOG.warn(s"Detected cycles in DAG of application $name!")
-    }
-
-    val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
+    val indices = dag.topologicalOrderIterator.toList.zipWithIndex.toMap
     val graph = dag.mapVertex { processor =>
       val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
       updatedProcessor

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 90141d4..57602c5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -397,7 +397,7 @@ object ClockService {
       }
 
       if (isClockStalling) {
-        val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId =>
+        val processorId = dag.graph.topologicalOrderIterator.toList.find { processorId =>
           val clock = processorClocks.get(processorId)
           if (clock.isDefined) {
             clock.get.min == minClock.appClock

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 04b5337..77083f0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -67,7 +67,7 @@ class Planner {
   private def optimize(dag: Graph[Op, OpEdge])
     (implicit system: ActorSystem): Graph[Op, OpEdge] = {
     val graph = dag.copy
-    val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
+    val nodes = graph.topologicalOrderIterator.toList.reverse
     for (node <- nodes) {
       val outGoingEdges = graph.outgoingEdgesOf(node)
       for (edge <- outGoingEdges) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
index ccda8f0..f9e2efd 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
@@ -38,7 +38,7 @@ class DAGSpec extends PropSpec with PropertyChecks with Matchers {
       dag.processors.size shouldBe 1
       assert(dag.taskCount == parallelism)
       dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index))
-      dag.graph.edges shouldBe empty
+      dag.graph.getEdges shouldBe empty
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 70d21b5..be4cc63 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -86,7 +86,7 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
     val plan = planner.plan(graph)
       .mapVertex(_.description)
 
-    plan.vertices.toSet should contain theSameElementsAs
+    plan.getVertices.toSet should contain theSameElementsAs
       Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
     plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
       a[GroupByPartitioner[_, _]]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index c8c8b9f..d43bca0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -58,8 +58,8 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
     application.name shouldBe "dsl"
     val dag = application.userConfig
       .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
-    dag.vertices.size shouldBe 2
-    dag.vertices.foreach { processor =>
+    dag.getVertices.size shouldBe 2
+    dag.getVertices.foreach { processor =>
       processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
       if (processor.description == "A.globalWindows") {
         processor.parallelism shouldBe 2

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index ef8f932..0b8abcd 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -85,8 +85,10 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
     }
     val expectedDagTopology = getExpectedDagTopology
 
-    dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
-    dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
+    dagTopology.getVertices.toSet should
+      contain theSameElementsAs expectedDagTopology.getVertices.toSet
+    dagTopology.getEdges.toSet should
+      contain theSameElementsAs expectedDagTopology.getEdges.toSet
   }
 
   private def getExpectedDagTopology: Graph[String, String] = {