You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/11/23 15:51:25 UTC

[06/18] eagle git commit: [EAGLE-1029] fix bug in generateGroupbyMonitorMetadata

[EAGLE-1029] fix bug in generateGroupbyMonitorMetadata

https://issues.apache.org/jira/browse/EAGLE-1029

Author: Zhao, Qingwen <qi...@apache.org>

Closes #939 from qingwen220/EAGLE-1029.

(cherry picked from commit 1fb60cc762da24625f37c599c2ea89d693adad67)
Signed-off-by: Zhao, Qingwen <qi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/d6d859fb
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/d6d859fb
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/d6d859fb

Branch: refs/heads/master
Commit: d6d859fbdbd8d3d86cd4595303a9101d0ae0dddd
Parents: cf3e0b2
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Sat May 27 16:03:19 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Sat May 27 16:03:43 2017 +0800

----------------------------------------------------------------------
 .../impl/MonitorMetadataGenerator.java          | 10 +++--
 .../apache/alert/coordinator/SchedulerTest.java | 41 ++++++++++++++++++++
 .../src/test/resources/application.conf         |  2 +-
 3 files changed, 48 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/d6d859fb/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
index fb20e66..0b63f47 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -175,10 +175,12 @@ public class MonitorMetadataGenerator {
                     routeSpec.setStreamId(partiton.getStreamId());
 
                     for (StreamWorkSlotQueue sq : ms.getQueues()) {
-                        PolicyWorkerQueue queue = new PolicyWorkerQueue();
-                        queue.setWorkers(sq.getWorkingSlots());
-                        queue.setPartition(partiton);
-                        routeSpec.addQueue(queue);
+                        if (sq.getTopoGroupStartIndex().containsKey(u.getTopoName())) {
+                            PolicyWorkerQueue queue = new PolicyWorkerQueue();
+                            queue.setWorkers(sq.getWorkingSlots());
+                            queue.setPartition(partiton);
+                            routeSpec.addQueue(queue);
+                        }
                     }
 
                     spec.addRouterSpec(routeSpec);

http://git-wip-us.apache.org/repos/asf/eagle/blob/d6d859fb/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
index 1bfdd7b..1e61de8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
@@ -63,6 +63,8 @@ public class SchedulerTest {
     private static final String TEST_POLICY_1 = "test-policy1";
     private static final String TEST_POLICY_2 = "test-policy2";
     private static final String TEST_POLICY_3 = "test-policy3";
+    private static final String TEST_POLICY_4 = "test-policy4";
+    private static final String TEST_POLICY_5 = "test-policy5";
     private static final String STREAM1 = "stream1";
     private static final String DS_NAME = "ds1";
     private static ObjectMapper mapper = new ObjectMapper();
@@ -189,6 +191,45 @@ public class SchedulerTest {
         }
     }
 
+    @Test
+    public void testMonitorMetadataGenerator() {
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(6, 10);
+
+        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
+
+        // topology has
+        InMemScheduleConext context = createScheduleContext(mgmtService);
+        createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_2, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_3, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_4, STREAM1, PARALELLISM);
+
+        ps.init(context, mgmtService);
+        ScheduleOption option = new ScheduleOption();
+        option.setPoliciesPerBolt(1);
+        ps.schedule(option);
+        ScheduleState state = ps.getState();
+
+        Assert.assertTrue(state.getGroupSpecs().get("topo2").getRouterSpecs().size() == 0);
+
+        context = createScheduleContext(mgmtService);
+        createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_2, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_3, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_4, STREAM1, PARALELLISM);
+        createSamplePolicy(context, TEST_POLICY_5, STREAM1, PARALELLISM);
+
+        ps.init(context, mgmtService);
+        ps.schedule(option);
+        state = ps.getState();
+
+        for(StreamRouterSpec spec : state.getGroupSpecs().get("topo2").getRouterSpecs()) {
+            if (spec.getStreamId().equals(STREAM1)) {
+                Assert.assertTrue(spec.getTargetQueue().size() == 1);
+            }
+        }
+    }
+
     private TestTopologyMgmtService createMgmtService() {
         TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
         return mgmtService;

http://git-wip-us.apache.org/repos/asf/eagle/blob/d6d859fb/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
index 5d4da38..b92d3eb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
@@ -15,7 +15,7 @@
 
 {
   "coordinator": {
-    "policiesPerBolt": 5,
+    "policiesPerBolt": 2,
     "boltParallelism": 5,
     "policyDefaultParallelism": 5,
     "boltLoadUpbound": 0.8,