You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:15 UTC
[13/50] incubator-gearpump git commit: change
topologicalOrderIterator to topologicalOrderWithCirclesIterator
change topologicalOrderIterator to topologicalOrderWithCirclesIterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/dcd2ca46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/dcd2ca46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/dcd2ca46
Branch: refs/heads/master
Commit: dcd2ca46335be5682cfd00e64df6a04f99ff8f82
Parents: 55135fb
Author: pangolulu <gy...@gmail.com>
Authored: Tue Feb 2 20:18:31 2016 +0800
Committer: pangolulu <gy...@gmail.com>
Committed: Tue Feb 2 20:18:31 2016 +0800
----------------------------------------------------------------------
.../src/main/scala/io/gearpump/streaming/StreamApplication.scala | 2 +-
.../main/scala/io/gearpump/streaming/appmaster/ClockService.scala | 2 +-
.../src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/dcd2ca46/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
index cba19a4..b68054a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
@@ -144,7 +144,7 @@ object StreamApplication {
LOG.warn(s"Detected cycles in DAG of application $name!")
}
- val indices = dag.topologicalOrderIterator.toList.zipWithIndex.toMap
+ val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
val graph = dag.mapVertex {processor =>
val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
updatedProcessor
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/dcd2ca46/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
index 038620a..578a15d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
@@ -380,7 +380,7 @@ object ClockService {
}
if (isClockStalling) {
- val processorId = dag.graph.topologicalOrderIterator.toList.find { processorId =>
+ val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId =>
val clock = processorClocks.get(processorId)
if (clock.isDefined) {
clock.get.min == minClock.appClock
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/dcd2ca46/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
index b3714f2..ee0818d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
@@ -55,7 +55,7 @@ class Planner {
private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
val newGraph = dag.mapVertex(op => OpChain(List(op)))
- val nodes = newGraph.topologicalOrderIterator.toList.reverse
+ val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
for (node <- nodes) {
val outGoingEdges = newGraph.outgoingEdgesOf(node)
for (edge <- outGoingEdges) {