You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/08 07:53:14 UTC

incubator-eagle git commit: [EAGLE-821] clear useless queue when build schedule context

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 269ff147f -> 162aac84f


[EAGLE-821] clear useless queue when build schedule context

https://issues.apache.org/jira/browse/EAGLE-821

Author: Zhao, Qingwen <qi...@apache.org>

Closes #722 from qingwen220/EAGLE-821.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/162aac84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/162aac84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/162aac84

Branch: refs/heads/master
Commit: 162aac84f1d1e01d232c8a43b98d5f06280ddaa3
Parents: 269ff14
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Dec 8 15:53:02 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Dec 8 15:53:02 2016 +0800

----------------------------------------------------------------------
 .../engine/coordinator/AlertDefinition.java     | 32 +++++++++++++
 .../engine/coordinator/PolicyDefinition.java    | 34 ++++++-------
 .../model/internal/AlertDefinitionTest.java     | 50 ++++++++++++++++++++
 .../coordinator/impl/GreedyPolicyScheduler.java |  2 +-
 .../coordinator/impl/WorkQueueBuilder.java      |  2 +-
 .../provider/ScheduleContextBuilder.java        | 10 ++--
 .../extractor/mr/MRTopologyEntityParser.java    | 37 +++++++++------
 .../topology/storm/TopologyDataPersistBolt.java |  2 +-
 8 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
index 66579bb..66a9bce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
@@ -16,6 +16,8 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 public class AlertDefinition {
     private TemplateType templateType = TemplateType.TEXT;
     private String subject;
@@ -69,4 +71,34 @@ public class AlertDefinition {
         // FILE,
         // HTTP
     }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(templateType)
+                .append(this.body)
+                .append(this.category)
+                .append(this.severity)
+                .append(this.subject)
+                .build();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (that == this) {
+            return true;
+        }
+        if (!(that instanceof AlertDefinition)) {
+            return false;
+        }
+        AlertDefinition another = (AlertDefinition) that;
+        if (another.templateType.equals(this.templateType)
+                && another.body.equals(this.body)
+                && another.category.equals(this.category)
+                && another.severity.equals(this.severity)
+                && another.subject.equals(this.subject)) {
+            return true;
+        }
+        return false;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 3663670..02072ad 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -135,14 +135,15 @@ public class PolicyDefinition implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(name)
-            .append(inputStreams)
-            .append(outputStreams)
-            .append(definition)
-            .append(partitionSpec)
-            .append(policyStatus)
-            .append(parallelismHint)
-            .build();
+                .append(name)
+                .append(inputStreams)
+                .append(outputStreams)
+                .append(definition)
+                .append(partitionSpec)
+                .append(policyStatus)
+                .append(parallelismHint)
+                .append(alertDefinition)
+                .build();
     }
 
     @Override
@@ -158,14 +159,15 @@ public class PolicyDefinition implements Serializable {
         PolicyDefinition another = (PolicyDefinition) that;
 
         if (Objects.equals(another.name, this.name)
-            && Objects.equals(another.description, this.description)
-            && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
-            && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
-            && (another.definition != null && another.definition.equals(this.definition))
-            && Objects.equals(this.definition, another.definition)
-            && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
-            && another.policyStatus.equals(this.policyStatus)
-            && another.parallelismHint == this.parallelismHint) {
+                && Objects.equals(another.description, this.description)
+                && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+                && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+                && (another.definition != null && another.definition.equals(this.definition))
+                && Objects.equals(this.definition, another.definition)
+                && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+                && another.policyStatus.equals(this.policyStatus)
+                && another.parallelismHint == this.parallelismHint
+                && Objects.equals(another.alertDefinition, alertDefinition)) {
             return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java
new file mode 100644
index 0000000..4ebf37f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
+import org.apache.eagle.alert.engine.coordinator.AlertSeverity;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AlertDefinitionTest {
+
+    @Test
+    public void testEqual() {
+        AlertDefinition ad1 = new AlertDefinition();
+        ad1.setBody("body1");
+        ad1.setCategory("email");
+        ad1.setSeverity(AlertSeverity.CRITICAL);
+        ad1.setSubject("");
+
+        AlertDefinition ad2 = new AlertDefinition();
+        ad2.setBody("body1");
+        ad2.setCategory("email");
+        ad2.setSeverity(AlertSeverity.CRITICAL);
+        ad2.setSubject("");
+
+        Assert.assertTrue(ad1.equals(ad2));
+
+        ad1.setBody("body1");
+        ad1.setCategory("email");
+        ad1.setSeverity(AlertSeverity.FATAL);
+        ad1.setSubject("");
+
+        Assert.assertTrue(!ad1.equals(ad2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index 89f7cdb..e50b64c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -306,7 +306,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
     private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
         // overload or over policy # or already contains
         if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
-            || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
+            || boltUsage.getPolicies().size() >= policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
             return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
index f44f80e..a717b1c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
@@ -53,7 +53,7 @@ public class WorkQueueBuilder {
         IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
         List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
         if (slots.size() < size) {
-            LOG.error("allocat stream work queue failed, required size");
+            LOG.error("allocate stream work queue failed, required size");
             return null;
         }
         StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index 98b598a..b8e3824 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -181,7 +181,11 @@ public class ScheduleContextBuilder {
         while (it.hasNext()) {
             MonitoredStream ms = it.next();
             Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
-            // clean queue that underly topology is changed(removed/down)
+            Set<String> usedQueueSet = new HashSet<>();
+            assignments.values().stream().forEach(assignment -> usedQueueSet.add(assignment.getQueueId()));
+
+            // clean queues that underlying topology is changed(removed/down)
+            // clear queues that are no longer used
             while (queueIt.hasNext()) {
                 StreamWorkSlotQueue queue = queueIt.next();
                 boolean deprecated = false;
@@ -193,7 +197,7 @@ public class ScheduleContextBuilder {
                         break;
                     }
                 }
-                if (deprecated) {
+                if (deprecated || !usedQueueSet.contains(queue.getQueueId())) {
                     queueIt.remove();
                 }
             }
@@ -205,7 +209,7 @@ public class ScheduleContextBuilder {
     }
 
     private List<PolicyAssignment> detectAssignmentsChange(List<PolicyAssignment> list, ScheduleState state) {
-        // FIXME: duplciated build map ?
+        // FIXME: duplicated build map ?
         Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
         for (MonitoredStream ms : state.getMonitoredStreams()) {
             for (StreamWorkSlotQueue q : ms.getQueues()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
index c36e9e1..088b5b3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -133,11 +133,9 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
         boolean isSuccess = false;
         String nodeKey;
         Map<String, MRServiceTopologyAPIEntity> nmMap = new HashMap<>();
+        Map<String, Integer> statusCount = new HashMap<>();
         try {
             YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
-            int runningNodeCount = 0;
-            int lostNodeCount = 0;
-            int unhealthyNodeCount = 0;
             int rackWarningCount = 0;
             final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
             for (YarnNodeInfo info : list) {
@@ -151,26 +149,28 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
                     nodeManagerEntity.setHealthReport(info.getHealthReport());
                 }
                 if (info.getState() != null) {
-                    final String state = info.getState().toLowerCase();
+                    String state = info.getState().toLowerCase();
+                    nodeManagerEntity.setStatus(state);
+                } else {
+                    String state = "null";
                     nodeManagerEntity.setStatus(state);
-                    if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
-                        ++runningNodeCount;
-                    } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
-                        ++lostNodeCount;
-                    } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
-                        ++unhealthyNodeCount;
-                    }
                 }
+
                 nodeKey = generateKey(nodeManagerEntity);
-                if (nmMap.containsKey(nodeKey) && nmMap.get(nodeKey).getLastUpdateTime() < nodeManagerEntity.getLastHealthUpdate()) {
-                    nmMap.put(nodeKey, nodeManagerEntity);
+                if (nmMap.containsKey(nodeKey)) {
+                    if (nmMap.get(nodeKey).getLastUpdateTime() < nodeManagerEntity.getLastHealthUpdate()) {
+                        updateStatusCount(statusCount, nmMap.get(nodeKey).getStatus(), -1);
+                        nmMap.put(nodeKey, nodeManagerEntity);
+                        updateStatusCount(statusCount, nodeManagerEntity.getStatus(), 1);
+                    }
                 } else {
                     nmMap.put(nodeKey, nodeManagerEntity);
+                    updateStatusCount(statusCount, nodeManagerEntity.getStatus(), 1);
                 }
             }
-            LOGGER.info("Total NMs: {}, Actual NMs: {}, Running NMs: {}, lost NMs: {}, unhealthy NMs: {}", list.size(), nmMap.size(), runningNodeCount, lostNodeCount, unhealthyNodeCount);
+            LOGGER.info("Total NMs: {}, Actual NMs: {}, Details: {}", list.size(), nmMap.size(), statusCount);
 
-            double value = runningNodeCount * 1d / nmMap.size();
+            double value = statusCount.get(NODE_MANAGER_RUNNING_STATUS) * 1d / nmMap.size();
             result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
             result.getSlaveNodes().addAll(nmMap.values());
             isSuccess = true;
@@ -180,6 +180,13 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
         return isSuccess;
     }
 
+    private void updateStatusCount(Map<String, Integer> statusCount, String status, int increment) {
+        if (!statusCount.containsKey(status)) {
+            statusCount.put(status, 0);
+        }
+        statusCount.put(status, statusCount.get(status) + increment);
+    }
+
     private String extractMasterHost(String url) {
         Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
         if (matcher.find()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index 1e7acb8..9be78e0 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -148,7 +148,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
             if (!response.isSuccess()) {
                 LOG.error("Got exception from eagle service: " + response.getException());
             } else {
-                LOG.info("Successfully wrote {} metrics for {}", entities.size(), serviceName);
+                LOG.info("Successfully wrote {} metrics for {}", metrics.size(), serviceName);
             }
         } catch (Exception e) {
             LOG.error("cannot create entities successfully", e);