You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/21 22:27:02 UTC

[incubator-heron] branch master updated: get back newParallelism ifelse branch (#2992)

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 6705259  get back newParallelism ifelse branch (#2992)
6705259 is described below

commit 6705259a2d5adda86fedb543ad43c6cef91647eb
Author: bed debug <hu...@users.noreply.github.com>
AuthorDate: Tue Aug 21 15:26:58 2018 -0700

    get back newParallelism ifelse branch (#2992)
    
    * get back newParallelism ifelse branch
    
    * revertunit test
    
    * reusecode
    
    * reorg
    
    * mergelog
---
 .../heron/scheduler/RuntimeManagerRunner.java      | 109 ++++++++++++++-------
 .../heron/scheduler/RuntimeManagerRunnerTest.java  |   4 +-
 2 files changed, 74 insertions(+), 39 deletions(-)

diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
index 1805cab..38c4a94 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
@@ -197,6 +197,9 @@ public class RuntimeManagerRunner {
     String newParallelism = updateConfig.getStringValue(RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY);
     String newContainerNumber = updateConfig.getStringValue(RUNTIME_MANAGER_CONTAINER_NUMBER_KEY);
     String userRuntimeConfig = updateConfig.getStringValue(RUNTIME_MANAGER_RUNTIME_CONFIG_KEY);
+    LOG.info("userRuntimeConfig " + userRuntimeConfig
+        + "; newParallelism " + newParallelism
+        + "; newContainerNumber " + newContainerNumber);
 
     // parallelism and runtime config can not be updated at the same time.
     if (((newParallelism != null && !newParallelism.isEmpty())
@@ -209,19 +212,12 @@ public class RuntimeManagerRunner {
     if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) {
       // Update user runtime config if userRuntimeConfig parameter is available
       updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig);
-    } else if ((newParallelism != null && !newParallelism.isEmpty())
-        || (newContainerNumber != null && !newContainerNumber.isEmpty())) {
-      int newContainers = getCurrentContainerNumber(topologyName);
-      Map<String, Integer> changeRequests = new HashMap<String, Integer>();
-
-      if (newParallelism != null && !newParallelism.isEmpty()) {
-        changeRequests = parseNewParallelismParam(newParallelism);
-      }
-      if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
-        newContainers = Integer.parseInt(newContainerNumber);
-      }
-      updatePackingPlan(topologyName, newContainers, changeRequests);
-
+    } else if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
+      // Update container count if newContainerNumber parameter is available
+      updateTopologyContainerCount(topologyName, newContainerNumber, newParallelism);
+    } else if (newParallelism != null && !newParallelism.isEmpty()) {
+      // Update parallelism if newParallelism parameter is available
+      updateTopologyComponentParallelism(topologyName, newParallelism);
     } else {
       throw new TopologyRuntimeManagementException("Missing arguments. Not taking action.");
     }
@@ -237,28 +233,10 @@ public class RuntimeManagerRunner {
     return cPlan.getContainers().size();
   }
 
-
-  @VisibleForTesting
-  void updatePackingPlan(String topologyName,
-                         Integer containerNum,
-                         Map<String, Integer> changeRequests)
-      throws PackingException, UpdateDryRunResponse {
-
-    SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
-    TopologyAPI.Topology topology = manager.getTopology(topologyName);
-    PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
-    boolean parallelismChange = parallelismChangeDetected(currentPlan, changeRequests);
-    boolean containerChange = containersNumChangeDetected(currentPlan, containerNum);
-
-    if (!parallelismChange && !containerChange) {
-      throw new TopologyRuntimeManagementException(
-          String.format("Both component parallelism request and container number are the "
-              + "same as in the running topology."));
-    }
-    // at least one of the two need to be changed
-    PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
-        containerNum, topology);
-
+  void sendUpdateRequest(TopologyAPI.Topology topology,
+                         Map<String, Integer> changeRequests,
+                         PackingPlans.PackingPlan currentPlan,
+                         PackingPlans.PackingPlan proposedPlan) {
     if (Context.dryRun(config)) {
       PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
       PackingPlan oldPlan = deserializer.fromProto(currentPlan);
@@ -279,7 +257,59 @@ public class RuntimeManagerRunner {
               + updateTopologyRequest + "The topology can be in a strange stage. "
               + "Please check carefully or redeploy the topology !!");
     }
+  }
 
+  @VisibleForTesting
+  void updateTopologyComponentParallelism(String topologyName, String  newParallelism)
+      throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
+    LOG.fine(String.format("updateTopologyHandler called for %s with %s",
+        topologyName, newParallelism));
+    Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
+
+    SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+    TopologyAPI.Topology topology = manager.getTopology(topologyName);
+    PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+    if (!parallelismChangeDetected(currentPlan, changeRequests)) {
+      throw new TopologyRuntimeManagementException(
+          String.format("The component parallelism request (%s) is the same as the "
+              + "current topology parallelism. Not taking action.", newParallelism));
+    }
+
+    PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+        null, topology);
+
+    sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
+  }
+
+  @VisibleForTesting
+  void updateTopologyContainerCount(String topologyName,
+                                    String newContainerNumber,
+                                    String  newParallelism)
+      throws PackingException, UpdateDryRunResponse {
+    LOG.fine(String.format("updateTopologyHandler called for %s with %s and %s",
+        topologyName, newContainerNumber, newParallelism));
+    Integer containerNum = Integer.parseInt(newContainerNumber);
+    Map<String, Integer> changeRequests = new HashMap<String, Integer>();
+    if (newParallelism != null && !newParallelism.isEmpty()) {
+      changeRequests = parseNewParallelismParam(newParallelism);
+    }
+
+    SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+    TopologyAPI.Topology topology = manager.getTopology(topologyName);
+    PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+    if (!containersNumChangeDetected(currentPlan, containerNum)
+        && !parallelismChangeDetected(currentPlan, changeRequests)) {
+      throw new TopologyRuntimeManagementException(
+          String.format("Both component parallelism request and container number are the "
+              + "same as in the running topology."));
+    }
+
+    PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+        containerNum, topology);
+
+    sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
   }
 
   @VisibleForTesting
@@ -368,7 +398,7 @@ public class RuntimeManagerRunner {
   @VisibleForTesting
   PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
                                                Map<String, Integer> changeRequests,
-                                               int containerNum,
+                                               Integer containerNum,
                                                TopologyAPI.Topology topology)
       throws PackingException {
     PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
@@ -392,7 +422,12 @@ public class RuntimeManagerRunner {
     LOG.info("Updating packing plan using " + repackingClass);
     try {
       packing.initialize(config, topology);
-      PackingPlan packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+      PackingPlan packedPlan = null;
+      if (containerNum == null) {
+        packedPlan = packing.repack(currentPackingPlan, componentChanges);
+      } else {
+        packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+      }
       return serializer.toProto(packedPlan);
     } finally {
       SysUtils.closeIgnoringExceptions(packing);
diff --git a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
index 5e635ab..d48bc17 100644
--- a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
+++ b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
@@ -236,7 +236,7 @@ public class RuntimeManagerRunnerTest {
 
     when(manager.getPackingPlan(eq(TOPOLOGY_NAME))).thenReturn(currentPlan);
     doReturn(proposedPlan).when(runner).buildNewPackingPlan(
-        eq(currentPlan), eq(changeRequests), 1, any(TopologyAPI.Topology.class));
+        eq(currentPlan), eq(changeRequests), any(TopologyAPI.Topology.class));
 
     Scheduler.UpdateTopologyRequest updateTopologyRequest =
         Scheduler.UpdateTopologyRequest.newBuilder()
@@ -246,7 +246,7 @@ public class RuntimeManagerRunnerTest {
 
     when(client.updateTopology(updateTopologyRequest)).thenReturn(true);
     try {
-      runner.updatePackingPlan(TOPOLOGY_NAME, 1, changeRequests);
+      runner.updateTopologyComponentParallelism(TOPOLOGY_NAME, newParallelism);
     } finally {
       int expectedClientUpdateCalls = expectedResult ? 1 : 0;
       verify(client, times(expectedClientUpdateCalls)).updateTopology(updateTopologyRequest);