You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/21 20:42:37 UTC

[incubator-heron] branch huijunw/fix2980 updated: reusecode

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch huijunw/fix2980
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/huijunw/fix2980 by this push:
     new acc123b  reusecode
acc123b is described below

commit acc123ba2d5b7b217a60c5790ac83612b2a0fefc
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Tue Aug 21 13:42:12 2018 -0700

    reusecode
---
 .../heron/scheduler/RuntimeManagerRunner.java      | 144 +++++++--------------
 1 file changed, 47 insertions(+), 97 deletions(-)

diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
index 5f2f264..8b05985 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
@@ -213,12 +213,8 @@ public class RuntimeManagerRunner {
       // Update user runtime config if userRuntimeConfig parameter is available
       updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig);
     } else if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
-      int newContainers = Integer.parseInt(newContainerNumber);
-      Map<String, Integer> changeRequests = new HashMap<String, Integer>();
-      if (newParallelism != null && !newParallelism.isEmpty()) {
-        changeRequests = parseNewParallelismParam(newParallelism);
-      }
-      updatePackingPlan(topologyName, newContainers, changeRequests);
+      // Update container count if newContainerNumber parameter is available
+      updateTopologyContainerCount(topologyName, newContainerNumber, newParallelism);
     } else if (newParallelism != null && !newParallelism.isEmpty()) {
       // Update parallelism if newParallelism parameter is available
       updateTopologyComponentParallelism(topologyName, newParallelism);
@@ -237,25 +233,10 @@ public class RuntimeManagerRunner {
     return cPlan.getContainers().size();
   }
 
-  @VisibleForTesting
-  void updateTopologyComponentParallelism(String topologyName, String  newParallelism)
-      throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
-    LOG.fine(String.format("updateTopologyHandler called for %s with %s",
-        topologyName, newParallelism));
-    SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
-    TopologyAPI.Topology topology = manager.getTopology(topologyName);
-    Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
-    PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
-
-    if (!changeDetected(currentPlan, changeRequests)) {
-      throw new TopologyRuntimeManagementException(
-          String.format("The component parallelism request (%s) is the same as the "
-              + "current topology parallelism. Not taking action.", newParallelism));
-    }
-
-    PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
-        topology);
-
+  void sendUpdateRequest(TopologyAPI.Topology topology,
+                         Map<String, Integer> changeRequests,
+                         PackingPlans.PackingPlan currentPlan,
+                         PackingPlans.PackingPlan proposedPlan) {
     if (Context.dryRun(config)) {
       PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
       PackingPlan oldPlan = deserializer.fromProto(currentPlan);
@@ -279,10 +260,39 @@ public class RuntimeManagerRunner {
   }
 
   @VisibleForTesting
-  void updatePackingPlan(String topologyName,
-                         Integer containerNum,
-                         Map<String, Integer> changeRequests)
+  void updateTopologyComponentParallelism(String topologyName, String  newParallelism)
+      throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
+    LOG.fine(String.format("updateTopologyHandler called for %s with %s",
+        topologyName, newParallelism));
+    SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+    TopologyAPI.Topology topology = manager.getTopology(topologyName);
+    Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
+    PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+    if (!parallelismChangeDetected(currentPlan, changeRequests)) {
+      throw new TopologyRuntimeManagementException(
+          String.format("The component parallelism request (%s) is the same as the "
+              + "current topology parallelism. Not taking action.", newParallelism));
+    }
+
+    PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+        null, topology);
+
+    sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
+  }
+
+  @VisibleForTesting
+  void updateTopologyContainerCount(String topologyName,
+                                    String newContainerNumber,
+                                    String  newParallelism)
       throws PackingException, UpdateDryRunResponse {
+    LOG.fine(String.format("updateTopologyHandler called for %s with %s and %s",
+        topologyName, newContainerNumber, newParallelism));
+    Integer containerNum = Integer.parseInt(newContainerNumber);
+    Map<String, Integer> changeRequests = new HashMap<String, Integer>();
+    if (newParallelism != null && !newParallelism.isEmpty()) {
+      changeRequests = parseNewParallelismParam(newParallelism);
+    }
 
     SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
     TopologyAPI.Topology topology = manager.getTopology(topologyName);
@@ -295,31 +305,11 @@ public class RuntimeManagerRunner {
           String.format("Both component parallelism request and container number are the "
               + "same as in the running topology."));
     }
-    // at least one of the two need to be changed
+
     PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
         containerNum, topology);
 
-    if (Context.dryRun(config)) {
-      PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
-      PackingPlan oldPlan = deserializer.fromProto(currentPlan);
-      PackingPlan newPlan = deserializer.fromProto(proposedPlan);
-      throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
-    }
-
-    Scheduler.UpdateTopologyRequest updateTopologyRequest =
-        Scheduler.UpdateTopologyRequest.newBuilder()
-            .setCurrentPackingPlan(currentPlan)
-            .setProposedPackingPlan(proposedPlan)
-            .build();
-
-    LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
-    if (!schedulerClient.updateTopology(updateTopologyRequest)) {
-      throw new TopologyRuntimeManagementException(
-          "Failed to update topology with Scheduler, updateTopologyRequest="
-              + updateTopologyRequest + "The topology can be in a strange stage. "
-              + "Please check carefully or redeploy the topology !!");
-    }
-
+    sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
   }
 
   @VisibleForTesting
@@ -408,6 +398,7 @@ public class RuntimeManagerRunner {
   @VisibleForTesting
   PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
                                                Map<String, Integer> changeRequests,
+                                               Integer containerNum,
                                                TopologyAPI.Topology topology)
       throws PackingException {
     PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
@@ -431,41 +422,12 @@ public class RuntimeManagerRunner {
     LOG.info("Updating packing plan using " + repackingClass);
     try {
       packing.initialize(config, topology);
-      PackingPlan packedPlan = packing.repack(currentPackingPlan, componentChanges);
-      return serializer.toProto(packedPlan);
-    } finally {
-      SysUtils.closeIgnoringExceptions(packing);
-    }
-  }
-
-  @VisibleForTesting
-  PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
-                                               Map<String, Integer> changeRequests,
-                                               int containerNum,
-                                               TopologyAPI.Topology topology)
-      throws PackingException {
-    PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
-    PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
-    PackingPlan currentPackingPlan = deserializer.fromProto(currentProtoPlan);
-
-    Map<String, Integer> componentCounts = currentPackingPlan.getComponentCounts();
-    Map<String, Integer> componentChanges = parallelismDelta(componentCounts, changeRequests);
-
-    // Create an instance of the packing class
-    String repackingClass = Context.repackingClass(config);
-    IRepacking packing;
-    try {
-      // create an instance of the packing class
-      packing = ReflectionUtils.newInstance(repackingClass);
-    } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
-      throw new IllegalArgumentException(
-          "Failed to instantiate packing instance: " + repackingClass, e);
-    }
-
-    LOG.info("Updating packing plan using " + repackingClass);
-    try {
-      packing.initialize(config, topology);
-      PackingPlan packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+      PackingPlan packedPlan = null;
+      if (containerNum == null) {
+        packedPlan = packing.repack(currentPackingPlan, componentChanges);
+      } else {
+        packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+      }
       return serializer.toProto(packedPlan);
     } finally {
       SysUtils.closeIgnoringExceptions(packing);
@@ -551,16 +513,4 @@ public class RuntimeManagerRunner {
     }
     return false;
   }
-
-  private static boolean changeDetected(PackingPlans.PackingPlan currentProtoPlan,
-                                        Map<String, Integer> changeRequests) {
-    PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
-    PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
-    for (String component : changeRequests.keySet()) {
-      if (changeRequests.get(component) != currentPlan.getComponentCounts().get(component)) {
-        return true;
-      }
-    }
-    return false;
-  }
 }