You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/08/12 04:54:24 UTC

[GitHub] nlu90 commented on a change in pull request #2979: [healthmgr] refactor physicalplanprovider and topologyprovider

nlu90 commented on a change in pull request #2979: [healthmgr] refactor physicalplanprovider and topologyprovider
URL: https://github.com/apache/incubator-heron/pull/2979#discussion_r209443035
 
 

 ##########
 File path: heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
 ##########
 @@ -19,70 +19,129 @@
 
 package org.apache.heron.healthmgr.common;
 
+import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Provider;
 
-import com.microsoft.dhalion.events.EventHandler;
-import com.microsoft.dhalion.events.EventManager;
-
-import org.apache.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
-import org.apache.heron.healthmgr.common.HealthManagerEvents.TopologyUpdate;
+import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.proto.system.PhysicalPlans.PhysicalPlan;
+import org.apache.heron.proto.tmaster.TopologyMaster;
 import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import org.apache.heron.spi.utils.NetworkUtils;
 
 import static org.apache.heron.healthmgr.HealthPolicyConfig.CONF_TOPOLOGY_NAME;
 
 /**
- * A topology's physical plan may get updated after initial deployment. This provider is used to
- * fetch the latest version from the state manager and provide to any dependent components.
+ * A topology's physical plan may get updated at runtime. This provider is used to
+ * fetch the latest version from the tmaster and provide to any dependent components.
  */
 public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
   private static final Logger LOG = Logger.getLogger(PhysicalPlanProvider.class.getName());
 
   private final SchedulerStateManagerAdaptor stateManagerAdaptor;
   private final String topologyName;
 
-  private PhysicalPlan physicalPlan;
+  private PhysicalPlan cachedPhysicalPlan = null;
 
   @Inject
   public PhysicalPlanProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
-      EventManager eventManager, @Named(CONF_TOPOLOGY_NAME) String topologyName) {
+                              @Named(CONF_TOPOLOGY_NAME) String topologyName) {
     this.stateManagerAdaptor = stateManagerAdaptor;
     this.topologyName = topologyName;
-    eventManager.addEventListener(TopologyUpdate.class, new EventHandler<TopologyUpdate>() {
-      /**
-       * Invalidates cached physical plan on receiving topology update notification
-       */
-      @Override
-      public synchronized void onEvent(TopologyUpdate event) {
-        LOG.info(
-            "Received topology update event, invalidating cached PhysicalPlan: " + event.type());
-        physicalPlan = null;
-      }
-    });
-    eventManager.addEventListener(ContainerRestart.class, new EventHandler<ContainerRestart>() {
-      /**
-       * Invalidates cached physical plan on receiving container restart notification
-       */
-      @Override
-      public synchronized void onEvent(ContainerRestart event) {
-        LOG.info("Received container restart event, invalidating cached PhysicalPlan: "
-            + event.type());
-        physicalPlan = null;
-      }
-    });
   }
 
   @Override
   public synchronized PhysicalPlan get() {
-    physicalPlan = stateManagerAdaptor.getPhysicalPlan(topologyName);
-    if (physicalPlan == null) {
+    TopologyMaster.TMasterLocation tMasterLocation
+        = stateManagerAdaptor.getTMasterLocation(topologyName);
+    String host = tMasterLocation.getHost();
+    int port = tMasterLocation.getControllerPort();
+
+    // construct metric cache stat url
+    String url = "http://" + host + ":" + port + "/get_current_physical_plan";
+    LOG.fine("tmaster physical plan query endpoint: " + url);
+
+    // http communication
+    HttpURLConnection con = NetworkUtils.getHttpConnection(url);
+    NetworkUtils.sendHttpGetRequest(con);
+    byte[] responseData = NetworkUtils.readHttpResponse(con);
+    // byte to base64 string
+    String encodedString = new String(responseData);
+    LOG.fine("tmaster returns physical plan in base64 str: " + encodedString);
+    // base64 string to proto bytes
+    byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
+    // construct proto obj from bytes
+    PhysicalPlan physicalPlan = null;
+    try {
+      physicalPlan = PhysicalPlan.parseFrom(decodedBytes);
+    } catch (Exception e) {
       throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
     }
+
+    cachedPhysicalPlan = physicalPlan;
     return physicalPlan;
   }
 
+  public PhysicalPlan getCachedPhysicalPlan() {
+    try {
+      get();
+    } catch (InvalidStateException e) {
+      if (cachedPhysicalPlan == null) {
+        throw e;
+      }
+    }
+    return cachedPhysicalPlan;
+  }
+
+  /**
+   * A utility method to extract bolt component names from the topology.
+   *
+   * @return array of all bolt names
+   */
+  protected Collection<String> getBoltNames(PhysicalPlan pp) {
+    TopologyAPI.Topology localTopology = pp.getTopology();
+    ArrayList<String> boltNames = new ArrayList<>();
+    for (TopologyAPI.Bolt bolt : localTopology.getBoltsList()) {
+      boltNames.add(bolt.getComp().getName());
+    }
+
+    return boltNames;
+  }
+  public Collection<String> getBoltNames() {
 
 Review comment:
   blank line above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services