You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/06/18 21:36:32 UTC

[incubator-heron] 01/02: enable runtime config for autobackpressure policy

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch huijunw/togglepolicy
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 1fce9aff12eea2a3d545b71af3b73a5c5d09519f
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Mon Jun 18 14:25:34 2018 -0700

    enable runtime config for autobackpressure policy
---
 heron/config/src/yaml/conf/local/healthmgr.yaml    |  18 ++--
 .../healthmgr/common/PhysicalPlanProvider.java     |   8 +-
 .../AutoRestartBackpressureContainerPolicy.java    |   3 +-
 .../heron/healthmgr/policy/ToggleablePolicy.java   | 115 +++++++++++++++++++++
 4 files changed, 129 insertions(+), 15 deletions(-)

diff --git a/heron/config/src/yaml/conf/local/healthmgr.yaml b/heron/config/src/yaml/conf/local/healthmgr.yaml
index aa49572..562715c 100644
--- a/heron/config/src/yaml/conf/local/healthmgr.yaml
+++ b/heron/config/src/yaml/conf/local/healthmgr.yaml
@@ -18,19 +18,19 @@
 # Topology health manager mode:
 # disabled = do not launch the health manager
 # cluster = launch the health manager on container-0
-heron.topology.healthmgr.mode: disabled
+heron.topology.healthmgr.mode: cluster
 
 # Default class and url for providing metrics
-heron.healthmgr.metricsource.type: org.apache.heron.healthmgr.sensors.TrackerMetricsProvider
+heron.healthmgr.metricsource.type: org.apache.heron.healthmgr.sensors.MetricsCacheMetricsProvider
 heron.healthmgr.metricsource.url: http://localhost:8888
 
 # Enable MetricsCache if MetricsCache is chosen as metrics provider
-#heron.topology.metricscachemgr.mode: cluster
+heron.topology.metricscachemgr.mode: cluster
 
 ## list of policies to be executed for self regulation
-#heron.class.health.policies:
+heron.class.health.policies:
 #  - dynamic-resource-allocation
-#  - auto-restart-backpressure-container
+  - auto-restart-backpressure-container
 #
 ## configuration specific to individual policies listed above
 #dynamic-resource-allocation:
@@ -38,7 +38,7 @@ heron.healthmgr.metricsource.url: http://localhost:8888
 #  health.policy.interval.ms: 120000
 #  BackPressureDetector.noiseFilterMillis: 20
 #  GrowingWaitQueueDetector.limit: 5
-#auto-restart-backpressure-container:
-#  health.policy.class: org.apache.heron.healthmgr.policy.AutoRestartBackpressureContainerPolicy
-#  health.policy.interval.ms: 120000
-#  BackPressureDetector.noiseFilterMillis: 20
+auto-restart-backpressure-container:
+  health.policy.class: org.apache.heron.healthmgr.policy.AutoRestartBackpressureContainerPolicy
+  health.policy.interval.ms: 120000
+  BackPressureDetector.noiseFilterMillis: 20
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
index 79990e8..94c8aa3 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -78,12 +78,12 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
 
   @Override
   public synchronized PhysicalPlan get() {
+//    if (physicalPlan == null) {
+    physicalPlan = stateManagerAdaptor.getPhysicalPlan(topologyName);
     if (physicalPlan == null) {
-      physicalPlan = stateManagerAdaptor.getPhysicalPlan(topologyName);
-      if (physicalPlan == null) {
-        throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
-      }
+      throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
     }
+//    }
     return physicalPlan;
   }
 
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
index f143246..4811be5 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
@@ -26,7 +26,6 @@ import javax.inject.Inject;
 
 import com.microsoft.dhalion.events.EventHandler;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.policy.HealthPolicyImpl;
 
 import org.apache.heron.healthmgr.HealthPolicyConfig;
 import org.apache.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
@@ -42,7 +41,7 @@ import static org.apache.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKe
  * state for long time, which we believe the container cannot recover.
  * 2. resolver: try to restart the backpressure container so as to be rescheduled.
  */
-public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
+public class AutoRestartBackpressureContainerPolicy extends ToggleablePolicy
     implements EventHandler<ContainerRestart> {
 
   private static final String CONF_WAIT_INTERVAL_MILLIS =
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
new file mode 100644
index 0000000..282361e
--- /dev/null
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.healthmgr.policy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.HealthPolicyImpl;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.healthmgr.common.PhysicalPlanProvider;
+
+/**
+ * This Policy class
+ * 1. has a switch
+ * 2. works with runtime config
+ */
+public class ToggleablePolicy extends HealthPolicyImpl {
+  private static final Logger LOG =
+      Logger.getLogger(ToggleablePolicy.class.getName());
+
+  public static final String TOGGLE_CONFIG_KEY = "healthmgr.toggleablepolicy.running";
+  public static final String TOGGLE_RUNTIME_CONFIG_KEY = TOGGLE_CONFIG_KEY + ":runtime";
+
+  @Inject
+  protected PhysicalPlanProvider physicalPlanProvider;
+  protected boolean running = true;
+
+//  @Inject
+//  ToggleablePolicy(PhysicalPlanProvider physicalPlanProvider) {
+//    running = true;
+//    this.physicalPlanProvider = physicalPlanProvider;
+//  }
+
+  @Override
+  public Collection<Measurement> executeSensors() {
+    for (TopologyAPI.Config.KeyValue kv
+        : physicalPlanProvider.get().getTopology().getTopologyConfig().getKvsList()) {
+      LOG.info("kv " + kv.getKey() + " => " + kv.getValue());
+      if (kv.getKey().equals(TOGGLE_RUNTIME_CONFIG_KEY)) {
+        String val = kv.getValue();
+        if ("False".equals(val) || "false".equals(val)) {
+          if (running) {
+            running = false;
+            LOG.info("policy running status changed to False");
+          }
+        } else if ("True".equals(val) || "true".equals(val)) {
+          if (!running) {
+            running = true;
+            LOG.info("policy running status changed to True");
+          }
+        } else {
+          LOG.warning("unknown runtime config for `policy running status`: " + val);
+        }
+      }
+    }
+
+    if (running) {
+      return super.executeSensors();
+    } else {
+      return new ArrayList<Measurement>();
+    }
+  }
+
+  @Override
+  public Collection<Symptom> executeDetectors(Collection<Measurement> measurements) {
+    if (running) {
+      return super.executeDetectors(measurements);
+    } else {
+      return new ArrayList<Symptom>();
+    }
+  }
+
+  @Override
+  public Collection<Diagnosis> executeDiagnosers(Collection<Symptom> symptoms) {
+    if (running) {
+      return super.executeDiagnosers(symptoms);
+    } else {
+      return new ArrayList<Diagnosis>();
+    }
+  }
+
+  @Override
+  public Collection<Action> executeResolvers(Collection<Diagnosis> diagnosis) {
+    if (running) {
+      return super.executeResolvers(diagnosis);
+    } else {
+      return super.executeResolvers(new ArrayList<Diagnosis>());
+    }
+  }
+}