You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/13 18:47:25 UTC
[incubator-heron] branch huijunw/refactorhealthmgr0.2.3 updated:
fixcomments
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch huijunw/refactorhealthmgr0.2.3
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/huijunw/refactorhealthmgr0.2.3 by this push:
new 4aa64a6 fixcomments
4aa64a6 is described below
commit 4aa64a6ac4c38e13cb7412ece8f4d1ae8f24df4e
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Mon Aug 13 11:47:12 2018 -0700
fixcomments
---
heron/executor/src/python/heron_executor.py | 3 +-
.../healthmgr/common/PhysicalPlanProvider.java | 76 ++++++++++++----------
.../heron/healthmgr/policy/ToggleablePolicy.java | 2 +-
3 files changed, 43 insertions(+), 38 deletions(-)
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index c53a608..d62b7da 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -506,8 +506,7 @@ class HeronExecutor(object):
"--role", self.role,
"--environment", self.environment,
"--topology_name", self.topology_name,
- "--metricsmgr_port", self.metrics_manager_port,
- "--verbose"]
+ "--metricsmgr_port", self.metrics_manager_port]
return healthmgr_cmd
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
index 2047d21..a563cd0 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -21,9 +21,8 @@ package org.apache.heron.healthmgr.common;
import java.net.HttpURLConnection;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
-import java.util.Collection;
+import java.util.List;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -48,7 +47,8 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
private final SchedulerStateManagerAdaptor stateManagerAdaptor;
private final String topologyName;
- private PhysicalPlan cachedPhysicalPlan = null;
+ // Cache the physical plan between two successful get() invocations.
+ private PhysicalPlan physicalPlan = null;
@Inject
public PhysicalPlanProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
@@ -57,6 +57,22 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
this.topologyName = topologyName;
}
+ protected PhysicalPlan ParseResponseToPhysicalPlan(byte[] responseData) {
+ // byte to base64 string
+ String encodedString = new String(responseData);
+ LOG.fine("tmaster returns physical plan in base64 str: " + encodedString);
+ // base64 string to proto bytes
+ byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
+ // construct proto obj from bytes
+ PhysicalPlan physicalPlan = null;
+ try {
+ physicalPlan = PhysicalPlan.parseFrom(decodedBytes);
+ } catch (Exception e) {
+ throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
+ }
+ return physicalPlan;
+ }
+
@Override
public synchronized PhysicalPlan get() {
TopologyMaster.TMasterLocation tMasterLocation
@@ -72,40 +88,34 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
HttpURLConnection con = NetworkUtils.getHttpConnection(url);
NetworkUtils.sendHttpGetRequest(con);
byte[] responseData = NetworkUtils.readHttpResponse(con);
- // byte to base64 string
- String encodedString = new String(responseData);
- LOG.fine("tmaster returns physical plan in base64 str: " + encodedString);
- // base64 string to proto bytes
- byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
- // construct proto obj from bytes
- PhysicalPlan physicalPlan = null;
- try {
- physicalPlan = PhysicalPlan.parseFrom(decodedBytes);
- } catch (Exception e) {
- throw new InvalidStateException(topologyName, "Failed to fetch the physical plan");
- }
- cachedPhysicalPlan = physicalPlan;
+ physicalPlan = ParseResponseToPhysicalPlan(responseData);
return physicalPlan;
}
- public PhysicalPlan getCachedPhysicalPlan() {
+ /**
+ * try best effort to return a latest physical plan
+ * 1. refresh physical plan
+ * 2. if refreshing fails, return the last physical plan
+ * @return physical plan
+ */
+ public PhysicalPlan getPhysicalPlan() {
try {
get();
} catch (InvalidStateException e) {
- if (cachedPhysicalPlan == null) {
+ if (physicalPlan == null) {
throw e;
}
}
- return cachedPhysicalPlan;
+ return physicalPlan;
}
/**
* A utility method to extract bolt component names from the topology.
*
- * @return array of all bolt names
+ * @return list of all bolt names
*/
- protected Collection<String> getBoltNames(PhysicalPlan pp) {
+ public List<String> getBoltNames(PhysicalPlan pp) {
TopologyAPI.Topology localTopology = pp.getTopology();
ArrayList<String> boltNames = new ArrayList<>();
for (TopologyAPI.Bolt bolt : localTopology.getBoltsList()) {
@@ -114,17 +124,18 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
return boltNames;
}
- public Collection<String> getBoltNames() {
- getCachedPhysicalPlan();
- return getBoltNames(cachedPhysicalPlan);
+
+ public List<String> getBoltNames() {
+ PhysicalPlan pp = getPhysicalPlan();
+ return getBoltNames(pp);
}
/**
* A utility method to extract spout component names from the topology.
*
- * @return array of all spout names
+ * @return list of all spout names
*/
- protected Collection<String> getSpoutNames(PhysicalPlan pp) {
+ public List<String> getSpoutNames(PhysicalPlan pp) {
TopologyAPI.Topology localTopology = pp.getTopology();
ArrayList<String> spoutNames = new ArrayList<>();
for (TopologyAPI.Spout spout : localTopology.getSpoutsList()) {
@@ -133,15 +144,10 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
return spoutNames;
}
- public Collection<String> getSpoutNames() {
- getCachedPhysicalPlan();
- return getSpoutNames(cachedPhysicalPlan);
- }
- public Collection<String> getSpoutBoltNames() {
- getCachedPhysicalPlan();
- Collection<String> ret = getBoltNames(cachedPhysicalPlan);
- ret.addAll(getSpoutNames(cachedPhysicalPlan));
- return ret;
+ public List<String> getSpoutNames() {
+ PhysicalPlan pp = getPhysicalPlan();
+ return getSpoutNames(pp);
}
+
}
diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
index c0021f1..195b53c 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/policy/ToggleablePolicy.java
@@ -75,7 +75,7 @@ public class ToggleablePolicy extends HealthPolicyImpl {
@Override
public Collection<Measurement> executeSensors() {
- LOG.info("----------------------------------");
+ LOG.info("--------- policy: " + policyId + " ---------");
try {
for (TopologyAPI.Config.KeyValue kv
: physicalPlanProvider.get().getTopology().getTopologyConfig().getKvsList()) {