You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/13 18:47:25 UTC

[incubator-heron] branch huijunw/refactorhealthmgr0.2.3 updated: fixcomments

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch huijunw/refactorhealthmgr0.2.3
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/huijunw/refactorhealthmgr0.2.3 by this push:
     new 4aa64a6  fixcomments
4aa64a6 is described below

commit 4aa64a6ac4c38e13cb7412ece8f4d1ae8f24df4e
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Mon Aug 13 11:47:12 2018 -0700

    fixcomments
---
 heron/executor/src/python/heron_executor.py        |  3 +-
 .../healthmgr/common/PhysicalPlanProvider.java     | 76 ++++++++++++----------
 .../heron/healthmgr/policy/ToggleablePolicy.java   |  2 +-
 3 files changed, 43 insertions(+), 38 deletions(-)

diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index c53a608..d62b7da 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -506,8 +506,7 @@ class HeronExecutor(object):
                      "--role", self.role,
                      "--environment", self.environment,
                      "--topology_name", self.topology_name,
-                     "--metricsmgr_port", self.metrics_manager_port,
-                     "--verbose"]
+                     "--metricsmgr_port", self.metrics_manager_port]
 
     return healthmgr_cmd
 
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
index 2047d21..a563cd0 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -21,9 +21,8 @@ package org.apache.heron.healthmgr.common;
 
 import java.net.HttpURLConnection;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
-import java.util.Collection;
+import java.util.List;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
@@ -48,7 +47,8 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
   private final SchedulerStateManagerAdaptor stateManagerAdaptor;
   private final String topologyName;
 
-  private PhysicalPlan cachedPhysicalPlan = null;
+  // Cache the physical plan between two successful get() invocations.
+  private PhysicalPlan physicalPlan = null;
 
   @Inject
   public PhysicalPlanProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
@@ -57,6 +57,22 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
     this.topologyName = topologyName;
   }
 
+  protected PhysicalPlan ParseResponseToPhysicalPlan(byte[] responseData) {
+    // byte to base64 string
+    String encodedString = new String(responseData);
+    LOG.fine("tmaster returns physical plan in base64 str: " + encodedString);
+    // base64 string to proto bytes
+    byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
+    // construct proto obj from bytes
+    PhysicalPlan physicalPlan = null;
+    try {
+      physicalPlan = PhysicalPlan.parseFrom(decodedBytes);
+    } catch (Exception e) {
+      throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
+    }
+    return physicalPlan;
+  }
+
   @Override
   public synchronized PhysicalPlan get() {
     TopologyMaster.TMasterLocation tMasterLocation
@@ -72,40 +88,34 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
     HttpURLConnection con = NetworkUtils.getHttpConnection(url);
     NetworkUtils.sendHttpGetRequest(con);
     byte[] responseData = NetworkUtils.readHttpResponse(con);
-    // byte to base64 string
-    String encodedString = new String(responseData);
-    LOG.fine("tmaster returns physical plan in base64 str: " + encodedString);
-    // base64 string to proto bytes
-    byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
-    // construct proto obj from bytes
-    PhysicalPlan physicalPlan = null;
-    try {
-      physicalPlan = PhysicalPlan.parseFrom(decodedBytes);
-    } catch (Exception e) {
-      throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
-    }
 
-    cachedPhysicalPlan = physicalPlan;
+    physicalPlan = ParseResponseToPhysicalPlan(responseData);
     return physicalPlan;
   }
 
-  public PhysicalPlan getCachedPhysicalPlan() {
+  /**
+   * try best effort to return a latest physical plan
+   * 1. refresh physical plan
+   * 2. if refreshing fails, return the last physical plan
+   * @return physical plan
+   */
+  public PhysicalPlan getPhysicalPlan() {
     try {
       get();
     } catch (InvalidStateException e) {
-      if (cachedPhysicalPlan == null) {
+      if (physicalPlan == null) {
         throw e;
       }
     }
-    return cachedPhysicalPlan;
+    return physicalPlan;
   }
 
   /**
    * A utility method to extract bolt component names from the topology.
    *
-   * @return array of all bolt names
+   * @return list of all bolt names
    */
-  protected Collection<String> getBoltNames(PhysicalPlan pp) {
+  public List<String> getBoltNames(PhysicalPlan pp) {
     TopologyAPI.Topology localTopology = pp.getTopology();
     ArrayList<String> boltNames = new ArrayList<>();
     for (TopologyAPI.Bolt bolt : localTopology.getBoltsList()) {
@@ -114,17 +124,18 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
 
     return boltNames;
   }
-  public Collection<String> getBoltNames() {
-    getCachedPhysicalPlan();
-    return getBoltNames(cachedPhysicalPlan);
+
+  public List<String> getBoltNames() {
+    PhysicalPlan pp = getPhysicalPlan();
+    return getBoltNames(pp);
   }
 
   /**
    * A utility method to extract spout component names from the topology.
    *
-   * @return array of all spout names
+   * @return list of all spout names
    */
-  protected Collection<String> getSpoutNames(PhysicalPlan pp) {
+  public List<String> getSpoutNames(PhysicalPlan pp) {
     TopologyAPI.Topology localTopology = pp.getTopology();
     ArrayList<String> spoutNames = new ArrayList<>();
     for (TopologyAPI.Spout spout : localTopology.getSpoutsList()) {
@@ -133,15 +144,10 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
 
     return spoutNames;
   }
-  public Collection<String> getSpoutNames() {
-    getCachedPhysicalPlan();
-    return getSpoutNames(cachedPhysicalPlan);
-  }
 
-  public Collection<String> getSpoutBoltNames() {
-    getCachedPhysicalPlan();
-    Collection<String> ret = getBoltNames(cachedPhysicalPlan);
-    ret.addAll(getSpoutNames(cachedPhysicalPlan));
-    return ret;
+  public List<String> getSpoutNames() {
+    PhysicalPlan pp = getPhysicalPlan();
+    return getSpoutNames(pp);
   }
+
 }
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
index c0021f1..195b53c 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
@@ -75,7 +75,7 @@ public class ToggleablePolicy extends HealthPolicyImpl {
 
   @Override
   public Collection<Measurement> executeSensors() {
-    LOG.info("----------------------------------");
+    LOG.info("--------- policy: " + policyId + " ---------");
     try {
       for (TopologyAPI.Config.KeyValue kv
           : physicalPlanProvider.get().getTopology().getTopologyConfig().getKvsList()) {