You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/08 07:53:14 UTC
incubator-eagle git commit: [EAGLE-821] clear useless queue when
build schedule context
Repository: incubator-eagle
Updated Branches:
refs/heads/master 269ff147f -> 162aac84f
[EAGLE-821] clear useless queue when build schedule context
https://issues.apache.org/jira/browse/EAGLE-821
Author: Zhao, Qingwen <qi...@apache.org>
Closes #722 from qingwen220/EAGLE-821.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/162aac84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/162aac84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/162aac84
Branch: refs/heads/master
Commit: 162aac84f1d1e01d232c8a43b98d5f06280ddaa3
Parents: 269ff14
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Dec 8 15:53:02 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Dec 8 15:53:02 2016 +0800
----------------------------------------------------------------------
.../engine/coordinator/AlertDefinition.java | 32 +++++++++++++
.../engine/coordinator/PolicyDefinition.java | 34 ++++++-------
.../model/internal/AlertDefinitionTest.java | 50 ++++++++++++++++++++
.../coordinator/impl/GreedyPolicyScheduler.java | 2 +-
.../coordinator/impl/WorkQueueBuilder.java | 2 +-
.../provider/ScheduleContextBuilder.java | 10 ++--
.../extractor/mr/MRTopologyEntityParser.java | 37 +++++++++------
.../topology/storm/TopologyDataPersistBolt.java | 2 +-
8 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
index 66579bb..66a9bce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
@@ -16,6 +16,8 @@
*/
package org.apache.eagle.alert.engine.coordinator;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
public class AlertDefinition {
private TemplateType templateType = TemplateType.TEXT;
private String subject;
@@ -69,4 +71,34 @@ public class AlertDefinition {
// FILE,
// HTTP
}
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(templateType)
+ .append(this.body)
+ .append(this.category)
+ .append(this.severity)
+ .append(this.subject)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+ if (!(that instanceof AlertDefinition)) {
+ return false;
+ }
+ AlertDefinition another = (AlertDefinition) that;
+ if (another.templateType.equals(this.templateType)
+ && another.body.equals(this.body)
+ && another.category.equals(this.category)
+ && another.severity.equals(this.severity)
+ && another.subject.equals(this.subject)) {
+ return true;
+ }
+ return false;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 3663670..02072ad 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -135,14 +135,15 @@ public class PolicyDefinition implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(name)
- .append(inputStreams)
- .append(outputStreams)
- .append(definition)
- .append(partitionSpec)
- .append(policyStatus)
- .append(parallelismHint)
- .build();
+ .append(name)
+ .append(inputStreams)
+ .append(outputStreams)
+ .append(definition)
+ .append(partitionSpec)
+ .append(policyStatus)
+ .append(parallelismHint)
+ .append(alertDefinition)
+ .build();
}
@Override
@@ -158,14 +159,15 @@ public class PolicyDefinition implements Serializable {
PolicyDefinition another = (PolicyDefinition) that;
if (Objects.equals(another.name, this.name)
- && Objects.equals(another.description, this.description)
- && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
- && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
- && (another.definition != null && another.definition.equals(this.definition))
- && Objects.equals(this.definition, another.definition)
- && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
- && another.policyStatus.equals(this.policyStatus)
- && another.parallelismHint == this.parallelismHint) {
+ && Objects.equals(another.description, this.description)
+ && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+ && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+ && (another.definition != null && another.definition.equals(this.definition))
+ && Objects.equals(this.definition, another.definition)
+ && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+ && another.policyStatus.equals(this.policyStatus)
+ && another.parallelismHint == this.parallelismHint
+ && Objects.equals(another.alertDefinition, alertDefinition)) {
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java
new file mode 100644
index 0000000..4ebf37f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/AlertDefinitionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
+import org.apache.eagle.alert.engine.coordinator.AlertSeverity;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AlertDefinitionTest {
+
+ @Test
+ public void testEqual() {
+ AlertDefinition ad1 = new AlertDefinition();
+ ad1.setBody("body1");
+ ad1.setCategory("email");
+ ad1.setSeverity(AlertSeverity.CRITICAL);
+ ad1.setSubject("");
+
+ AlertDefinition ad2 = new AlertDefinition();
+ ad2.setBody("body1");
+ ad2.setCategory("email");
+ ad2.setSeverity(AlertSeverity.CRITICAL);
+ ad2.setSubject("");
+
+ Assert.assertTrue(ad1.equals(ad2));
+
+ ad1.setBody("body1");
+ ad1.setCategory("email");
+ ad1.setSeverity(AlertSeverity.FATAL);
+ ad1.setSubject("");
+
+ Assert.assertTrue(!ad1.equals(ad2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index 89f7cdb..e50b64c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -306,7 +306,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
// overload or over policy # or already contains
if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
- || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
+ || boltUsage.getPolicies().size() >= policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
index f44f80e..a717b1c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
@@ -53,7 +53,7 @@ public class WorkQueueBuilder {
IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
if (slots.size() < size) {
- LOG.error("allocat stream work queue failed, required size");
+ LOG.error("allocate stream work queue failed, required size");
return null;
}
StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index 98b598a..b8e3824 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -181,7 +181,11 @@ public class ScheduleContextBuilder {
while (it.hasNext()) {
MonitoredStream ms = it.next();
Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
- // clean queue that underly topology is changed(removed/down)
+ Set<String> usedQueueSet = new HashSet<>();
+ assignments.values().stream().forEach(assignment -> usedQueueSet.add(assignment.getQueueId()));
+
+ // clean queues that underlying topology is changed(removed/down)
+ // clear queues that are no longer used
while (queueIt.hasNext()) {
StreamWorkSlotQueue queue = queueIt.next();
boolean deprecated = false;
@@ -193,7 +197,7 @@ public class ScheduleContextBuilder {
break;
}
}
- if (deprecated) {
+ if (deprecated || !usedQueueSet.contains(queue.getQueueId())) {
queueIt.remove();
}
}
@@ -205,7 +209,7 @@ public class ScheduleContextBuilder {
}
private List<PolicyAssignment> detectAssignmentsChange(List<PolicyAssignment> list, ScheduleState state) {
- // FIXME: duplciated build map ?
+ // FIXME: duplicated build map ?
Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
for (MonitoredStream ms : state.getMonitoredStreams()) {
for (StreamWorkSlotQueue q : ms.getQueues()) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
index c36e9e1..088b5b3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -133,11 +133,9 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
boolean isSuccess = false;
String nodeKey;
Map<String, MRServiceTopologyAPIEntity> nmMap = new HashMap<>();
+ Map<String, Integer> statusCount = new HashMap<>();
try {
YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
- int runningNodeCount = 0;
- int lostNodeCount = 0;
- int unhealthyNodeCount = 0;
int rackWarningCount = 0;
final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
for (YarnNodeInfo info : list) {
@@ -151,26 +149,28 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
nodeManagerEntity.setHealthReport(info.getHealthReport());
}
if (info.getState() != null) {
- final String state = info.getState().toLowerCase();
+ String state = info.getState().toLowerCase();
+ nodeManagerEntity.setStatus(state);
+ } else {
+ String state = "null";
nodeManagerEntity.setStatus(state);
- if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
- ++runningNodeCount;
- } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
- ++lostNodeCount;
- } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
- ++unhealthyNodeCount;
- }
}
+
nodeKey = generateKey(nodeManagerEntity);
- if (nmMap.containsKey(nodeKey) && nmMap.get(nodeKey).getLastUpdateTime() < nodeManagerEntity.getLastHealthUpdate()) {
- nmMap.put(nodeKey, nodeManagerEntity);
+ if (nmMap.containsKey(nodeKey)) {
+ if (nmMap.get(nodeKey).getLastUpdateTime() < nodeManagerEntity.getLastHealthUpdate()) {
+ updateStatusCount(statusCount, nmMap.get(nodeKey).getStatus(), -1);
+ nmMap.put(nodeKey, nodeManagerEntity);
+ updateStatusCount(statusCount, nodeManagerEntity.getStatus(), 1);
+ }
} else {
nmMap.put(nodeKey, nodeManagerEntity);
+ updateStatusCount(statusCount, nodeManagerEntity.getStatus(), 1);
}
}
- LOGGER.info("Total NMs: {}, Actual NMs: {}, Running NMs: {}, lost NMs: {}, unhealthy NMs: {}", list.size(), nmMap.size(), runningNodeCount, lostNodeCount, unhealthyNodeCount);
+ LOGGER.info("Total NMs: {}, Actual NMs: {}, Details: {}", list.size(), nmMap.size(), statusCount);
- double value = runningNodeCount * 1d / nmMap.size();
+ double value = statusCount.get(NODE_MANAGER_RUNNING_STATUS) * 1d / nmMap.size();
result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
result.getSlaveNodes().addAll(nmMap.values());
isSuccess = true;
@@ -180,6 +180,13 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
return isSuccess;
}
+ private void updateStatusCount(Map<String, Integer> statusCount, String status, int increment) {
+ if (!statusCount.containsKey(status)) {
+ statusCount.put(status, 0);
+ }
+ statusCount.put(status, statusCount.get(status) + increment);
+ }
+
private String extractMasterHost(String url) {
Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
if (matcher.find()) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/162aac84/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index 1e7acb8..9be78e0 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -148,7 +148,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
if (!response.isSuccess()) {
LOG.error("Got exception from eagle service: " + response.getException());
} else {
- LOG.info("Successfully wrote {} metrics for {}", entities.size(), serviceName);
+ LOG.info("Successfully wrote {} metrics for {}", metrics.size(), serviceName);
}
} catch (Exception e) {
LOG.error("cannot create entities successfully", e);