You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/29 17:49:36 UTC

samza git commit: SAMZA-1172: Fix for the topological sort to handle single-node loop

Repository: samza
Updated Branches:
  refs/heads/master 553ce33b1 -> 6f1b3db2c


SAMZA-1172: Fix for the topological sort to handle single-node loop

In the processor graph, the topological sort missed adding to the visited set during graph traversal. This caused wrong graph being generated for single-node loop. This is fixed in the patch.

Also fixed the maxPartition method not handling empty collection correctly.

Added a few new unit tests for these. Also adjust the timing of previous async commit unit tests so it can run more reliably. Long term wise we need to fix the timer inside the AsyncRunLoop tests.

Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Jacob Maes <jm...@apache.org>

Closes #100 from xinyuiscool/SAMZA-1172


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6f1b3db2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6f1b3db2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6f1b3db2

Branch: refs/heads/master
Commit: 6f1b3db2c4a1a2ef22ec023c6b3ed9f54766ae3e
Parents: 553ce33
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Wed Mar 29 10:49:25 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Mar 29 10:49:25 2017 -0700

----------------------------------------------------------------------
 .../samza/execution/ExecutionPlanner.java       |  4 +--
 .../apache/samza/execution/ProcessorGraph.java  |  6 ++++
 .../samza/execution/TestExecutionPlanner.java   | 23 ++++++++++++++
 .../samza/execution/TestProcessorGraph.java     | 32 ++++++++++++++++++++
 .../org/apache/samza/task/TestAsyncRunLoop.java |  6 ++--
 5 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 77790a8..ca2e71e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -302,8 +302,8 @@ public class ExecutionPlanner {
     }
   }
 
-  private static int maxPartition(Collection<StreamEdge> edges) {
-    return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).get();
+  /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
+    return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
 
   private static StreamSpec createStreamSpec(StreamEdge edge) {

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
index d94a9eb..13755ae 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
@@ -276,6 +276,10 @@ public class ProcessorGraph {
    */
   /* package private */ List<ProcessorNode> topologicalSort() {
     Collection<ProcessorNode> pnodes = nodes.values();
+    if (pnodes.size() == 1) {
+      return new ArrayList<>(pnodes);
+    }
+
     Queue<ProcessorNode> q = new ArrayDeque<>();
     Map<String, Long> indegree = new HashMap<>();
     Set<ProcessorNode> visited = new HashSet<>();
@@ -337,6 +341,7 @@ public class ProcessorGraph {
           }
           // start from the node with minimal input edge again
           q.add(minNode);
+          visited.add(minNode);
         } else {
           // all the remaining nodes should be reachable from sources
           // start from sources again to find the next node that hasn't been visited
@@ -344,6 +349,7 @@ public class ProcessorGraph {
               .filter(node -> !visited.contains(node))
               .findAny().get();
           q.add(nextNode);
+          visited.add(nextNode);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index fa02e04..b69eec6 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -20,6 +20,9 @@
 package org.apache.samza.execution;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +47,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -279,4 +283,23 @@ public class TestExecutionPlanner {
         assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
       });
   }
+
+  @Test
+  public void testMaxPartition() {
+    Collection<StreamEdge> edges = new ArrayList<>();
+    StreamEdge edge = new StreamEdge(input1);
+    edge.setPartitionCount(2);
+    edges.add(edge);
+    edge = new StreamEdge(input2);
+    edge.setPartitionCount(32);
+    edges.add(edge);
+    edge = new StreamEdge(input3);
+    edge.setPartitionCount(16);
+    edges.add(edge);
+
+    assertEquals(ExecutionPlanner.maxPartition(edges), 32);
+
+    edges = Collections.emptyList();
+    assertEquals(ExecutionPlanner.maxPartition(edges), StreamEdge.PARTITIONS_UNKNOWN);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
index 2bdf529..2f89d91 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
@@ -27,6 +27,7 @@ import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -34,6 +35,8 @@ public class TestProcessorGraph {
 
   ProcessorGraph graph1;
   ProcessorGraph graph2;
+  ProcessorGraph graph3;
+  ProcessorGraph graph4;
   int streamSeq = 0;
 
   private StreamSpec genStream() {
@@ -88,6 +91,24 @@ public class TestProcessorGraph {
     graph2.addIntermediateStream(genStream(), "5", "5");
     graph2.addIntermediateStream(genStream(), "5", "7");
     graph2.addSink(genStream(), "7");
+
+    /**
+     * graph3 is a graph with self loops
+     * 1<->1 -> 2<->2
+     */
+    graph3 = new ProcessorGraph(null);
+    graph3.addSource(genStream(), "1");
+    graph3.addIntermediateStream(genStream(), "1", "1");
+    graph3.addIntermediateStream(genStream(), "1", "2");
+    graph3.addIntermediateStream(genStream(), "2", "2");
+
+    /**
+     * graph4 is a graph of single-loop node
+     * 1<->1
+     */
+    graph4 = new ProcessorGraph(null);
+    graph4.addSource(genStream(), "1");
+    graph4.addIntermediateStream(genStream(), "1", "1");
   }
 
   @Test
@@ -194,5 +215,16 @@ public class TestProcessorGraph {
     assertTrue(idxMap2.get("6") > idxMap2.get("1"));
     assertTrue(idxMap2.get("5") > idxMap2.get("4"));
     assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+
+    //test graph3
+    List<ProcessorNode> sortedNodes3 = graph3.topologicalSort();
+    assertTrue(sortedNodes3.size() == 2);
+    assertEquals(sortedNodes3.get(0).getId(), "1");
+    assertEquals(sortedNodes3.get(1).getId(), "2");
+
+    //test graph4
+    List<ProcessorNode> sortedNodes4 = graph4.topologicalSort();
+    assertTrue(sortedNodes4.size() == 1);
+    assertEquals(sortedNodes4.get(0).getId(), "1");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 31cbe79..60dcd26 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -47,7 +47,6 @@ import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import scala.Option;
 import scala.collection.JavaConversions;
@@ -575,7 +574,7 @@ public class TestAsyncRunLoop {
       });
 
     runLoop.run();
-    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
 
     verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
     assertEquals(3, task0.processed);
@@ -585,7 +584,6 @@ public class TestAsyncRunLoop {
   }
 
   @Test
-  @Ignore
   public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
     TestTask task0 = new TestTask(true, true, false);
 
@@ -631,6 +629,6 @@ public class TestAsyncRunLoop {
 
     runLoop.run();
 
-    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
   }
 }