You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/21 20:42:37 UTC
[incubator-heron] branch huijunw/fix2980 updated: reusecode
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch huijunw/fix2980
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/huijunw/fix2980 by this push:
new acc123b reusecode
acc123b is described below
commit acc123ba2d5b7b217a60c5790ac83612b2a0fefc
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Tue Aug 21 13:42:12 2018 -0700
reusecode
---
.../heron/scheduler/RuntimeManagerRunner.java | 144 +++++++--------------
1 file changed, 47 insertions(+), 97 deletions(-)
diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
index 5f2f264..8b05985 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
@@ -213,12 +213,8 @@ public class RuntimeManagerRunner {
// Update user runtime config if userRuntimeConfig parameter is available
updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig);
} else if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
- int newContainers = Integer.parseInt(newContainerNumber);
- Map<String, Integer> changeRequests = new HashMap<String, Integer>();
- if (newParallelism != null && !newParallelism.isEmpty()) {
- changeRequests = parseNewParallelismParam(newParallelism);
- }
- updatePackingPlan(topologyName, newContainers, changeRequests);
+ // Update container count if newContainerNumber parameter is available
+ updateTopologyContainerCount(topologyName, newContainerNumber, newParallelism);
} else if (newParallelism != null && !newParallelism.isEmpty()) {
// Update parallelism if newParallelism parameter is available
updateTopologyComponentParallelism(topologyName, newParallelism);
@@ -237,25 +233,10 @@ public class RuntimeManagerRunner {
return cPlan.getContainers().size();
}
- @VisibleForTesting
- void updateTopologyComponentParallelism(String topologyName, String newParallelism)
- throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
- LOG.fine(String.format("updateTopologyHandler called for %s with %s",
- topologyName, newParallelism));
- SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
- TopologyAPI.Topology topology = manager.getTopology(topologyName);
- Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
- PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
-
- if (!changeDetected(currentPlan, changeRequests)) {
- throw new TopologyRuntimeManagementException(
- String.format("The component parallelism request (%s) is the same as the "
- + "current topology parallelism. Not taking action.", newParallelism));
- }
-
- PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
- topology);
-
+ void sendUpdateRequest(TopologyAPI.Topology topology,
+ Map<String, Integer> changeRequests,
+ PackingPlans.PackingPlan currentPlan,
+ PackingPlans.PackingPlan proposedPlan) {
if (Context.dryRun(config)) {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
PackingPlan oldPlan = deserializer.fromProto(currentPlan);
@@ -279,10 +260,39 @@ public class RuntimeManagerRunner {
}
@VisibleForTesting
- void updatePackingPlan(String topologyName,
- Integer containerNum,
- Map<String, Integer> changeRequests)
+ void updateTopologyComponentParallelism(String topologyName, String newParallelism)
+ throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
+ LOG.fine(String.format("updateTopologyHandler called for %s with %s",
+ topologyName, newParallelism));
+ SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+ TopologyAPI.Topology topology = manager.getTopology(topologyName);
+ Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
+ PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+ if (!parallelismChangeDetected(currentPlan, changeRequests)) {
+ throw new TopologyRuntimeManagementException(
+ String.format("The component parallelism request (%s) is the same as the "
+ + "current topology parallelism. Not taking action.", newParallelism));
+ }
+
+ PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+ null, topology);
+
+ sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
+ }
+
+ @VisibleForTesting
+ void updateTopologyContainerCount(String topologyName,
+ String newContainerNumber,
+ String newParallelism)
throws PackingException, UpdateDryRunResponse {
+ LOG.fine(String.format("updateTopologyHandler called for %s with %s and %s",
+ topologyName, newContainerNumber, newParallelism));
+ Integer containerNum = Integer.parseInt(newContainerNumber);
+ Map<String, Integer> changeRequests = new HashMap<String, Integer>();
+ if (newParallelism != null && !newParallelism.isEmpty()) {
+ changeRequests = parseNewParallelismParam(newParallelism);
+ }
SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
TopologyAPI.Topology topology = manager.getTopology(topologyName);
@@ -295,31 +305,11 @@ public class RuntimeManagerRunner {
String.format("Both component parallelism request and container number are the "
+ "same as in the running topology."));
}
- // at least one of the two need to be changed
+
PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
containerNum, topology);
- if (Context.dryRun(config)) {
- PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
- PackingPlan oldPlan = deserializer.fromProto(currentPlan);
- PackingPlan newPlan = deserializer.fromProto(proposedPlan);
- throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
- }
-
- Scheduler.UpdateTopologyRequest updateTopologyRequest =
- Scheduler.UpdateTopologyRequest.newBuilder()
- .setCurrentPackingPlan(currentPlan)
- .setProposedPackingPlan(proposedPlan)
- .build();
-
- LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
- if (!schedulerClient.updateTopology(updateTopologyRequest)) {
- throw new TopologyRuntimeManagementException(
- "Failed to update topology with Scheduler, updateTopologyRequest="
- + updateTopologyRequest + "The topology can be in a strange stage. "
- + "Please check carefully or redeploy the topology !!");
- }
-
+ sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
}
@VisibleForTesting
@@ -408,6 +398,7 @@ public class RuntimeManagerRunner {
@VisibleForTesting
PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
Map<String, Integer> changeRequests,
+ Integer containerNum,
TopologyAPI.Topology topology)
throws PackingException {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
@@ -431,41 +422,12 @@ public class RuntimeManagerRunner {
LOG.info("Updating packing plan using " + repackingClass);
try {
packing.initialize(config, topology);
- PackingPlan packedPlan = packing.repack(currentPackingPlan, componentChanges);
- return serializer.toProto(packedPlan);
- } finally {
- SysUtils.closeIgnoringExceptions(packing);
- }
- }
-
- @VisibleForTesting
- PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
- Map<String, Integer> changeRequests,
- int containerNum,
- TopologyAPI.Topology topology)
- throws PackingException {
- PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
- PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
- PackingPlan currentPackingPlan = deserializer.fromProto(currentProtoPlan);
-
- Map<String, Integer> componentCounts = currentPackingPlan.getComponentCounts();
- Map<String, Integer> componentChanges = parallelismDelta(componentCounts, changeRequests);
-
- // Create an instance of the packing class
- String repackingClass = Context.repackingClass(config);
- IRepacking packing;
- try {
- // create an instance of the packing class
- packing = ReflectionUtils.newInstance(repackingClass);
- } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
- throw new IllegalArgumentException(
- "Failed to instantiate packing instance: " + repackingClass, e);
- }
-
- LOG.info("Updating packing plan using " + repackingClass);
- try {
- packing.initialize(config, topology);
- PackingPlan packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+ PackingPlan packedPlan = null;
+ if (containerNum == null) {
+ packedPlan = packing.repack(currentPackingPlan, componentChanges);
+ } else {
+ packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+ }
return serializer.toProto(packedPlan);
} finally {
SysUtils.closeIgnoringExceptions(packing);
@@ -551,16 +513,4 @@ public class RuntimeManagerRunner {
}
return false;
}
-
- private static boolean changeDetected(PackingPlans.PackingPlan currentProtoPlan,
- Map<String, Integer> changeRequests) {
- PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
- PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
- for (String component : changeRequests.keySet()) {
- if (changeRequests.get(component) != currentPlan.getComponentCounts().get(component)) {
- return true;
- }
- }
- return false;
- }
}