You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/08/21 22:27:02 UTC
[incubator-heron] branch master updated: get back newParallelism
ifelse branch (#2992)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 6705259 get back newParallelism ifelse branch (#2992)
6705259 is described below
commit 6705259a2d5adda86fedb543ad43c6cef91647eb
Author: bed debug <hu...@users.noreply.github.com>
AuthorDate: Tue Aug 21 15:26:58 2018 -0700
get back newParallelism ifelse branch (#2992)
* get back newParallelism ifelse branch
* revertunit test
* reusecode
* reorg
* mergelog
---
.../heron/scheduler/RuntimeManagerRunner.java | 109 ++++++++++++++-------
.../heron/scheduler/RuntimeManagerRunnerTest.java | 4 +-
2 files changed, 74 insertions(+), 39 deletions(-)
diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
index 1805cab..38c4a94 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
@@ -197,6 +197,9 @@ public class RuntimeManagerRunner {
String newParallelism = updateConfig.getStringValue(RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY);
String newContainerNumber = updateConfig.getStringValue(RUNTIME_MANAGER_CONTAINER_NUMBER_KEY);
String userRuntimeConfig = updateConfig.getStringValue(RUNTIME_MANAGER_RUNTIME_CONFIG_KEY);
+ LOG.info("userRuntimeConfig " + userRuntimeConfig
+ + "; newParallelism " + newParallelism
+ + "; newContainerNumber " + newContainerNumber);
// parallelism and runtime config can not be updated at the same time.
if (((newParallelism != null && !newParallelism.isEmpty())
@@ -209,19 +212,12 @@ public class RuntimeManagerRunner {
if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) {
// Update user runtime config if userRuntimeConfig parameter is available
updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig);
- } else if ((newParallelism != null && !newParallelism.isEmpty())
- || (newContainerNumber != null && !newContainerNumber.isEmpty())) {
- int newContainers = getCurrentContainerNumber(topologyName);
- Map<String, Integer> changeRequests = new HashMap<String, Integer>();
-
- if (newParallelism != null && !newParallelism.isEmpty()) {
- changeRequests = parseNewParallelismParam(newParallelism);
- }
- if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
- newContainers = Integer.parseInt(newContainerNumber);
- }
- updatePackingPlan(topologyName, newContainers, changeRequests);
-
+ } else if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
+ // Update container count if newContainerNumber parameter is available
+ updateTopologyContainerCount(topologyName, newContainerNumber, newParallelism);
+ } else if (newParallelism != null && !newParallelism.isEmpty()) {
+ // Update parallelism if newParallelism parameter is available
+ updateTopologyComponentParallelism(topologyName, newParallelism);
} else {
throw new TopologyRuntimeManagementException("Missing arguments. Not taking action.");
}
@@ -237,28 +233,10 @@ public class RuntimeManagerRunner {
return cPlan.getContainers().size();
}
-
- @VisibleForTesting
- void updatePackingPlan(String topologyName,
- Integer containerNum,
- Map<String, Integer> changeRequests)
- throws PackingException, UpdateDryRunResponse {
-
- SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
- TopologyAPI.Topology topology = manager.getTopology(topologyName);
- PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
- boolean parallelismChange = parallelismChangeDetected(currentPlan, changeRequests);
- boolean containerChange = containersNumChangeDetected(currentPlan, containerNum);
-
- if (!parallelismChange && !containerChange) {
- throw new TopologyRuntimeManagementException(
- String.format("Both component parallelism request and container number are the "
- + "same as in the running topology."));
- }
- // at least one of the two need to be changed
- PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
- containerNum, topology);
-
+ void sendUpdateRequest(TopologyAPI.Topology topology,
+ Map<String, Integer> changeRequests,
+ PackingPlans.PackingPlan currentPlan,
+ PackingPlans.PackingPlan proposedPlan) {
if (Context.dryRun(config)) {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
PackingPlan oldPlan = deserializer.fromProto(currentPlan);
@@ -279,7 +257,59 @@ public class RuntimeManagerRunner {
+ updateTopologyRequest + "The topology can be in a strange stage. "
+ "Please check carefully or redeploy the topology !!");
}
+ }
+ @VisibleForTesting
+ void updateTopologyComponentParallelism(String topologyName, String newParallelism)
+ throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
+ LOG.fine(String.format("updateTopologyHandler called for %s with %s",
+ topologyName, newParallelism));
+ Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
+
+ SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+ TopologyAPI.Topology topology = manager.getTopology(topologyName);
+ PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+ if (!parallelismChangeDetected(currentPlan, changeRequests)) {
+ throw new TopologyRuntimeManagementException(
+ String.format("The component parallelism request (%s) is the same as the "
+ + "current topology parallelism. Not taking action.", newParallelism));
+ }
+
+ PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+ null, topology);
+
+ sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
+ }
+
+ @VisibleForTesting
+ void updateTopologyContainerCount(String topologyName,
+ String newContainerNumber,
+ String newParallelism)
+ throws PackingException, UpdateDryRunResponse {
+ LOG.fine(String.format("updateTopologyHandler called for %s with %s and %s",
+ topologyName, newContainerNumber, newParallelism));
+ Integer containerNum = Integer.parseInt(newContainerNumber);
+ Map<String, Integer> changeRequests = new HashMap<String, Integer>();
+ if (newParallelism != null && !newParallelism.isEmpty()) {
+ changeRequests = parseNewParallelismParam(newParallelism);
+ }
+
+ SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
+ TopologyAPI.Topology topology = manager.getTopology(topologyName);
+ PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
+
+ if (!containersNumChangeDetected(currentPlan, containerNum)
+ && !parallelismChangeDetected(currentPlan, changeRequests)) {
+ throw new TopologyRuntimeManagementException(
+ String.format("Both component parallelism request and container number are the "
+ + "same as in the running topology."));
+ }
+
+ PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
+ containerNum, topology);
+
+ sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan);
}
@VisibleForTesting
@@ -368,7 +398,7 @@ public class RuntimeManagerRunner {
@VisibleForTesting
PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan,
Map<String, Integer> changeRequests,
- int containerNum,
+ Integer containerNum,
TopologyAPI.Topology topology)
throws PackingException {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
@@ -392,7 +422,12 @@ public class RuntimeManagerRunner {
LOG.info("Updating packing plan using " + repackingClass);
try {
packing.initialize(config, topology);
- PackingPlan packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+ PackingPlan packedPlan = null;
+ if (containerNum == null) {
+ packedPlan = packing.repack(currentPackingPlan, componentChanges);
+ } else {
+ packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges);
+ }
return serializer.toProto(packedPlan);
} finally {
SysUtils.closeIgnoringExceptions(packing);
diff --git a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
index 5e635ab..d48bc17 100644
--- a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
+++ b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
@@ -236,7 +236,7 @@ public class RuntimeManagerRunnerTest {
when(manager.getPackingPlan(eq(TOPOLOGY_NAME))).thenReturn(currentPlan);
doReturn(proposedPlan).when(runner).buildNewPackingPlan(
- eq(currentPlan), eq(changeRequests), 1, any(TopologyAPI.Topology.class));
+ eq(currentPlan), eq(changeRequests), any(TopologyAPI.Topology.class));
Scheduler.UpdateTopologyRequest updateTopologyRequest =
Scheduler.UpdateTopologyRequest.newBuilder()
@@ -246,7 +246,7 @@ public class RuntimeManagerRunnerTest {
when(client.updateTopology(updateTopologyRequest)).thenReturn(true);
try {
- runner.updatePackingPlan(TOPOLOGY_NAME, 1, changeRequests);
+ runner.updateTopologyComponentParallelism(TOPOLOGY_NAME, newParallelism);
} finally {
int expectedClientUpdateCalls = expectedResult ? 1 : 0;
verify(client, times(expectedClientUpdateCalls)).updateTopology(updateTopologyRequest);