You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/06/18 21:36:32 UTC
[incubator-heron] 01/02: enable runtime config for autobackpressure
policy
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch huijunw/togglepolicy
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 1fce9aff12eea2a3d545b71af3b73a5c5d09519f
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Mon Jun 18 14:25:34 2018 -0700
enable runtime config for autobackpressure policy
---
heron/config/src/yaml/conf/local/healthmgr.yaml | 18 ++--
.../healthmgr/common/PhysicalPlanProvider.java | 8 +-
.../AutoRestartBackpressureContainerPolicy.java | 3 +-
.../heron/healthmgr/policy/ToggleablePolicy.java | 115 +++++++++++++++++++++
4 files changed, 129 insertions(+), 15 deletions(-)
diff --git a/heron/config/src/yaml/conf/local/healthmgr.yaml b/heron/config/src/yaml/conf/local/healthmgr.yaml
index aa49572..562715c 100644
--- a/heron/config/src/yaml/conf/local/healthmgr.yaml
+++ b/heron/config/src/yaml/conf/local/healthmgr.yaml
@@ -18,19 +18,19 @@
# Topology health manager mode:
# disabled = do not launch the health manager
# cluster = launch the health manager on container-0
-heron.topology.healthmgr.mode: disabled
+heron.topology.healthmgr.mode: cluster
# Default class and url for providing metrics
-heron.healthmgr.metricsource.type: org.apache.heron.healthmgr.sensors.TrackerMetricsProvider
+heron.healthmgr.metricsource.type: org.apache.heron.healthmgr.sensors.MetricsCacheMetricsProvider
heron.healthmgr.metricsource.url: http://localhost:8888
# Enable MetricsCache if MetricsCache is chosen as metrics provider
-#heron.topology.metricscachemgr.mode: cluster
+heron.topology.metricscachemgr.mode: cluster
## list of policies to be executed for self regulation
-#heron.class.health.policies:
+heron.class.health.policies:
# - dynamic-resource-allocation
-# - auto-restart-backpressure-container
+ - auto-restart-backpressure-container
#
## configuration specific to individual policies listed above
#dynamic-resource-allocation:
@@ -38,7 +38,7 @@ heron.healthmgr.metricsource.url: http://localhost:8888
# health.policy.interval.ms: 120000
# BackPressureDetector.noiseFilterMillis: 20
# GrowingWaitQueueDetector.limit: 5
-#auto-restart-backpressure-container:
-# health.policy.class: org.apache.heron.healthmgr.policy.AutoRestartBackpressureContainerPolicy
-# health.policy.interval.ms: 120000
-# BackPressureDetector.noiseFilterMillis: 20
+auto-restart-backpressure-container:
+ health.policy.class: org.apache.heron.healthmgr.policy.AutoRestartBackpressureContainerPolicy
+ health.policy.interval.ms: 120000
+ BackPressureDetector.noiseFilterMillis: 20
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
index 79990e8..94c8aa3 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -78,12 +78,12 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
@Override
public synchronized PhysicalPlan get() {
+// if (physicalPlan == null) {
+ physicalPlan = stateManagerAdaptor.getPhysicalPlan(topologyName);
if (physicalPlan == null) {
- physicalPlan = stateManagerAdaptor.getPhysicalPlan(topologyName);
- if (physicalPlan == null) {
- throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
- }
+ throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
}
+// }
return physicalPlan;
}
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
index f143246..4811be5 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
@@ -26,7 +26,6 @@ import javax.inject.Inject;
import com.microsoft.dhalion.events.EventHandler;
import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.policy.HealthPolicyImpl;
import org.apache.heron.healthmgr.HealthPolicyConfig;
import org.apache.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
@@ -42,7 +41,7 @@ import static org.apache.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKe
* state for long time, which we believe the container cannot recover.
* 2. resolver: try to restart the backpressure container so as to be rescheduled.
*/
-public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
+public class AutoRestartBackpressureContainerPolicy extends ToggleablePolicy
implements EventHandler<ContainerRestart> {
private static final String CONF_WAIT_INTERVAL_MILLIS =
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
new file mode 100644
index 0000000..282361e
--- /dev/null
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.healthmgr.policy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.HealthPolicyImpl;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.healthmgr.common.PhysicalPlanProvider;
+
+/**
+ * This Policy class
+ * 1. has a switch
+ * 2. works with runtime config
+ */
+public class ToggleablePolicy extends HealthPolicyImpl {
+ private static final Logger LOG =
+ Logger.getLogger(ToggleablePolicy.class.getName());
+
+ public static final String TOGGLE_CONFIG_KEY = "healthmgr.toggleablepolicy.running";
+ public static final String TOGGLE_RUNTIME_CONFIG_KEY = TOGGLE_CONFIG_KEY + ":runtime";
+
+ @Inject
+ protected PhysicalPlanProvider physicalPlanProvider;
+ protected boolean running = true;
+
+// @Inject
+// ToggleablePolicy(PhysicalPlanProvider physicalPlanProvider) {
+// running = true;
+// this.physicalPlanProvider = physicalPlanProvider;
+// }
+
+ @Override
+ public Collection<Measurement> executeSensors() {
+ for (TopologyAPI.Config.KeyValue kv
+ : physicalPlanProvider.get().getTopology().getTopologyConfig().getKvsList()) {
+ LOG.info("kv " + kv.getKey() + " => " + kv.getValue());
+ if (kv.getKey().equals(TOGGLE_RUNTIME_CONFIG_KEY)) {
+ String val = kv.getValue();
+ if ("False".equals(val) || "false".equals(val)) {
+ if (running) {
+ running = false;
+ LOG.info("policy running status changed to False");
+ }
+ } else if ("True".equals(val) || "true".equals(val)) {
+ if (!running) {
+ running = true;
+ LOG.info("policy running status changed to True");
+ }
+ } else {
+ LOG.warning("unknown runtime config for `policy running status`: " + val);
+ }
+ }
+ }
+
+ if (running) {
+ return super.executeSensors();
+ } else {
+ return new ArrayList<Measurement>();
+ }
+ }
+
+ @Override
+ public Collection<Symptom> executeDetectors(Collection<Measurement> measurements) {
+ if (running) {
+ return super.executeDetectors(measurements);
+ } else {
+ return new ArrayList<Symptom>();
+ }
+ }
+
+ @Override
+ public Collection<Diagnosis> executeDiagnosers(Collection<Symptom> symptoms) {
+ if (running) {
+ return super.executeDiagnosers(symptoms);
+ } else {
+ return new ArrayList<Diagnosis>();
+ }
+ }
+
+ @Override
+ public Collection<Action> executeResolvers(Collection<Diagnosis> diagnosis) {
+ if (running) {
+ return super.executeResolvers(diagnosis);
+ } else {
+ return super.executeResolvers(new ArrayList<Diagnosis>());
+ }
+ }
+}