You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/21 07:54:52 UTC

[incubator-heron] branch huijunw/fix2980 created (now 299765a)

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a change to branch huijunw/fix2980
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git.


      at 299765a  get back newParallelism ifelse branch

This branch includes the following new commits:

     new 299765a  get back newParallelism ifelse branch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-heron] 01/01: get back newParallelism ifelse branch

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch huijunw/fix2980
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 299765adb0ec7970542d5f689568b9e18fa73e08
Author: Huijun Wu <hu...@twitter.com>
AuthorDate: Tue Aug 21 00:54:23 2018 -0700

    get back newParallelism ifelse branch
---
 .../heron/scheduler/RuntimeManagerRunner.java      | 101 +++++++++++++++++++--
 1 file changed, 93 insertions(+), 8 deletions(-)

diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
index 1805cab..5f2f264 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
@@ -195,8 +195,11 @@ public class RuntimeManagerRunner {
       throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
     assert !potentialStaleExecutionData;
     String newParallelism = updateConfig.getStringValue(RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY);
+    LOG.info("newParallelism " + newParallelism);
     String newContainerNumber = updateConfig.getStringValue(RUNTIME_MANAGER_CONTAINER_NUMBER_KEY);
+    LOG.info("newContainerNumber " + newContainerNumber);
     String userRuntimeConfig = updateConfig.getStringValue(RUNTIME_MANAGER_RUNTIME_CONFIG_KEY);
+    LOG.info("userRuntimeConfig " + userRuntimeConfig);
 
     // parallelism and runtime config can not be updated at the same time.
     if (((newParallelism != null && !newParallelism.isEmpty())
@@ -209,19 +212,16 @@ public class RuntimeManagerRunner {
     if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) {
       // Update user runtime config if userRuntimeConfig parameter is available
       updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig);
-    } else if ((newParallelism != null && !newParallelism.isEmpty())
-        || (newContainerNumber != null && !newContainerNumber.isEmpty())) {
-      int newContainers = getCurrentContainerNumber(topologyName);
+    } else if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
+      int newContainers = Integer.parseInt(newContainerNumber);
       Map<String, Integer> changeRequests = new HashMap<String, Integer>();
-
       if (newParallelism != null && !newParallelism.isEmpty()) {
         changeRequests = parseNewParallelismParam(newParallelism);
       }
-      if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
-        newContainers = Integer.parseInt(newContainerNumber);
-      }
       updatePackingPlan(topologyName, newContainers, changeRequests);
-
+    } else if (newParallelism != null && !newParallelism.isEmpty()) {
+      // Update parallelism if newParallelism parameter is available
+      updateTopologyComponentParallelism(topologyName, newParallelism);
     } else {
       throw new TopologyRuntimeManagementException("Missing arguments. Not taking action.");
     }
@@ -237,6 +237,46 @@ public class RuntimeManagerRunner {
     return cPlan.getContainers().size();
   }
 
+  @VisibleForTesting
+  void updateTopologyComponentParallelism(String topologyName, String  newParallelism)
+      throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
+    LOG.fine(String.format("updateTopologyHandler called for %s with %s",
+        topologyName, newParallelism));
+    SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+    TopologyAPI.Topology topology = manager.getTopology(topologyName);
+    Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
+    PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+    if (!changeDetected(currentPlan, changeRequests)) {
+      throw new TopologyRuntimeManagementException(
+          String.format("The component parallelism request (%s) is the same as the "
+              + "current topology parallelism. Not taking action.", newParallelism));
+    }
+
+    PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+        topology);
+
+    if (Context.dryRun(config)) {
+      PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
+      PackingPlan oldPlan = deserializer.fromProto(currentPlan);
+      PackingPlan newPlan = deserializer.fromProto(proposedPlan);
+      throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
+    }
+
+    Scheduler.UpdateTopologyRequest updateTopologyRequest =
+        Scheduler.UpdateTopologyRequest.newBuilder()
+            .setCurrentPackingPlan(currentPlan)
+            .setProposedPackingPlan(proposedPlan)
+            .build();
+
+    LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
+    if (!schedulerClient.updateTopology(updateTopologyRequest)) {
+      throw new TopologyRuntimeManagementException(
+          "Failed to update topology with Scheduler, updateTopologyRequest="
+              + updateTopologyRequest + "The topology can be in a strange stage. "
+              + "Please check carefully or redeploy the topology !!");
+    }
+  }
 
   @VisibleForTesting
   void updatePackingPlan(String topologyName,
@@ -368,6 +408,39 @@ public class RuntimeManagerRunner {
   @VisibleForTesting
   PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
                                                Map<String, Integer> changeRequests,
+                                               TopologyAPI.Topology topology)
+      throws PackingException {
+    PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
+    PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
+    PackingPlan currentPackingPlan = deserializer.fromProto(currentProtoPlan);
+
+    Map<String, Integer> componentCounts = currentPackingPlan.getComponentCounts();
+    Map<String, Integer> componentChanges = parallelismDelta(componentCounts, changeRequests);
+
+    // Create an instance of the packing class
+    String repackingClass = Context.repackingClass(config);
+    IRepacking packing;
+    try {
+      // create an instance of the packing class
+      packing = ReflectionUtils.newInstance(repackingClass);
+    } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+      throw new IllegalArgumentException(
+          "Failed to instantiate packing instance: " + repackingClass, e);
+    }
+
+    LOG.info("Updating packing plan using " + repackingClass);
+    try {
+      packing.initialize(config, topology);
+      PackingPlan packedPlan = packing.repack(currentPackingPlan, componentChanges);
+      return serializer.toProto(packedPlan);
+    } finally {
+      SysUtils.closeIgnoringExceptions(packing);
+    }
+  }
+
+  @VisibleForTesting
+  PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
+                                               Map<String, Integer> changeRequests,
                                                int containerNum,
                                                TopologyAPI.Topology topology)
       throws PackingException {
@@ -478,4 +551,16 @@ public class RuntimeManagerRunner {
     }
     return false;
   }
+
+  private static boolean changeDetected(PackingPlans.PackingPlan currentProtoPlan,
+                                        Map<String, Integer> changeRequests) {
+    PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
+    PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
+    for (String component : changeRequests.keySet()) {
+      if (changeRequests.get(component) != currentPlan.getComponentCounts().get(component)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }