You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/02/14 14:11:25 UTC

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/5487

    [FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs

    ## What is the purpose of the change
    
    Jobs can now be rescaled by calling flink modify <JOB_ID> -p <PARALLELISM>.
    Internally, the CliFrontend will send the corresponding REST call and poll
    for status updates.
    
    This PR is based on #5454.
    
    ## Brief change log
    
    - Add `modify` call to `CliFrontend`
    - Add `ClusterClient#rescaleJob` method with default implementation
    - Implement `RestClusterClient#rescalJob` method to trigger asynchronous rescale operation via REST and poll for its status updates
    
    ## Verifying this change
    
    - Tested manually
    - Added `CliFrontendModifyTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (JavaDocs + stdout help)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink rescaleCommand

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5487
    
----
commit d9159228091cae9ebbd1bb718b69e6cf452881e1
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T11:41:44Z

    [FLINK-8643] [flip6] Use JobManagerOptions#SLOT_REQUEST_TIMEOUT in ExecutionGraph
    
    This commit changes the initialization of the ExecutionGraph to use the
    JobManagerOptions#SLOT_REQUEST_TIMEOUT for the slot allocation. Furthermore,
    it changes the behaviour of the SlotPool#ProviderAndOwner implementation such
    that the timeout is given to it via the SlotProvider#allocateSlot call.

commit 19780c9d284914ec51e92231536315299a3c2da3
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T12:18:01Z

    [hotfix] [flip6] Remove unnecessary timeout from SlotPool

commit 9924776c92a378cef144c0767f1ff18b799d52e9
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T14:33:11Z

    [FLINK-8647] [flip6] Introduce JobMasterConfiguration
    
    This commit introduces a JobMasterConfiguration which contains JobMaster specific
    configuration settings.

commit fde75841de2e27cb7380f3a28066a99e2c1a690d
Author: zentol <ch...@...>
Date:   2018-01-23T12:50:32Z

    [FLINK-8475][config][docs] Integrate HA-ZK options
    
    This closes #5462.

commit 788a17fdbd4aaf3429ead4491ede197fc775b1f0
Author: zentol <ch...@...>
Date:   2018-01-23T13:04:36Z

    [FLINK-8475][config][docs] Integrate YARN options
    
    This closes #5463.

commit fcd783358c282e61bf12e0c18298c237c85a6695
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T15:08:38Z

    [hotfix] [tests] Simplify JobMasterTest

commit 8206e6f13809c0b60bfaf776bc386088f535e723
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T15:10:09Z

    [FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints
    
    Let the JobMaster respect checkpoints and savepoints. The JobMaster will always
    try to restore the latest checkpoint if there is one available. Next it will check
    whether savepoint restore settings have been set. If so, then it will try to restore
    the savepoint. Only if these settings are not set, the job will be started from
    scratch.

commit 057a95b7328b1cca7b78bf1dd25e8d048df70410
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T15:11:37Z

    [hotfix] Fix checkstyle violations in ExecutionGraph

commit 9930b0991320bcff268ca82db6378df8976560dc
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T15:12:41Z

    [FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph
    
    The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended but its
    clean up has not been done yet. Only after all Executions have been canceled, the
    ExecutionGraph will enter the SUSPENDED state and complete the termination future
    accordingly.

commit 6c51ad306c90464572353168ecafdb962794747e
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T15:14:41Z

    [FLINK-8629] [flip6] Allow JobMaster to rescale jobs
    
    This commit adds the functionality to rescale a job or parts of it to
    the JobMaster. In order to rescale a job, the JobMaster does the following:
    1. Take a savepoint
    2. Create a rescaled ExecutionGraph from the JobGraph
    3. Initialize it with the taken savepoint
    4. Suspend the old ExecutionGraph
    5. Restart the new ExecutionGraph once the old ExecutionGraph has been suspended

commit 990aa95040264ffd3c3ab20638cb8212af68d155
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T15:34:31Z

    [FLINK-8633] [flip6] Expose rescaling of jobs via the Dispatcher
    
    This commit exposes the JobMaster#rescaleJob via the Dispatcher. This will
    allow it to call this functionality from a REST handler.

commit ca85f392c0caea751bf7805beb4428eba5cf851f
Author: Till Rohrmann <tr...@...>
Date:   2018-02-03T16:41:51Z

    [FLINK-8631] [rest] Add support for generic types to the RestClient
    
    This commit allows the Restclient to receive generic response types. In order
    to do this, the MessageHeaders contain now information about the generic
    type parameters of the response type.

commit 381099612b78ad1ab7bc5f239d2fd7c618c74222
Author: Till Rohrmann <tr...@...>
Date:   2018-02-02T10:06:35Z

    [FLINK-8634] [rest] Introduce job rescaling REST handler
    
    Add rescaling REST handler as a sub class of the
    AbstractAsynchronousOperationHandlers.

commit f4e9f71b4821dabfadc47491e3b4a67ef5e3dab3
Author: Till Rohrmann <tr...@...>
Date:   2018-02-02T10:23:00Z

    [FLINK-8632] [flip6] Introduce generalized asynchronous operation handlers
    
    The asynchronous operation handlers are the generalization of the SavepointHandlers.
    They consist of a Trigger- and a StatusHandler. The TriggerHandler is used to trigger
    an asynchronous operation. The handler stores the operation future and returns a
    trigger id. The trigger id can be used to query the status of the operation via the
    StatusHandler. Once the operation has completed, the StatusHandler will return the
    result.

commit 79f20916a1366fed9caee27c6fcfe932d99c7482
Author: Till Rohrmann <tr...@...>
Date:   2018-02-11T18:50:46Z

    [FLINK-8635] [rest] Register rescaling handlers at web endpoint

commit 7081cba4cd3ba377b64a4d165e1fc888be99873f
Author: Till Rohrmann <tr...@...>
Date:   2018-02-13T16:29:32Z

    [FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs
    
    Jobs can now be rescaled by calling flink modify <JOB_ID> -p <PARALLELISM>.
    Internally, the CliFrontend will send the corresponding REST call and poll
    for status updates.

----


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169917224
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    +			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
    +
    +			if (executionJobVertex != null) {
    +				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +			}
    +
    +			try {
    +				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +			} catch (FlinkException e) {
    +				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
    +
    +				log.info(msg, e);
    +
    +				return FutureUtils.completedExceptionally(
    +					new JobModificationException(msg, e));
    +			}
    +		}
    +
    +		final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +		final ExecutionGraph newExecutionGraph;
    +
    +		try {
    +			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +				null,
    +				jobGraph,
    +				jobMasterConfiguration.getConfiguration(),
    +				scheduledExecutorService,
    +				scheduledExecutorService,
    +				slotPool.getSlotProvider(),
    +				userCodeLoader,
    +				highAvailabilityServices.getCheckpointRecoveryFactory(),
    +				rpcTimeout,
    +				currentExecutionGraph.getRestartStrategy(),
    +				jobMetricGroup,
    +				1,
    +				blobServer,
    +				jobMasterConfiguration.getSlotRequestTimeout(),
    +				log);
    +		} catch (JobExecutionException | JobException e) {
    +			return FutureUtils.completedExceptionally(
    +				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
    +		}
    +
    +		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		checkpointCoordinator.stopCheckpointScheduler();
    +
    +		// 4. take a savepoint
    +		final CompletableFuture<String> savepointFuture = triggerSavepoint(
    +			jobMasterConfiguration.getTmpDirectory(),
    +			timeout);
    +
    +		final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
    +			.thenApplyAsync(
    +				(String savepointPath) -> {
    +					try {
    +						newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
    +							savepointPath,
    +							false,
    +							newExecutionGraph.getAllVertices(),
    +							userCodeLoader);
    +					} catch (Exception e) {
    +						disposeSavepoint(savepointPath);
    +
    +						throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e));
    +					}
    +
    +					// delete the savepoint file once we reach a terminal state
    +					newExecutionGraph.getTerminationFuture()
    +						.whenCompleteAsync(
    +							(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
    +							scheduledExecutorService);
    +
    +					return newExecutionGraph;
    +				}, scheduledExecutorService)
    +			.exceptionally(
    +				(Throwable failure) -> {
    +					// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
    +					// coordinator and abort the rescaling operation
    +					if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
    --- End diff --
    
    The check could move inside `startCheckpointScheduler`


---

[GitHub] flink issue #5487: [FLINK-8656] [flip6] Add modify CLI command to rescale Fl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5487
  
    I had these comments as well that were hidden:
    ![image](https://user-images.githubusercontent.com/1681921/36565125-d2dd1d52-181f-11e8-8dd7-ea25de6cacad.png)



---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169632755
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.function.BiConsumerWithException;
    +
    +/**
    + * Definition of the rescaling behaviour.
    + */
    +public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Integer, FlinkException> {
    +	// rescaling is only executed if the operator can be set to the given parallelism
    +	STRICT {
    +		@Override
    +		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
    +			if (jobVertex.getMaxParallelism() < newParallelism) {
    +				throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
    +					" because its maximum parallelism " + jobVertex.getMaxParallelism() +
    +					" is smaller than the new parallelism " + newParallelism + '.');
    +			} else {
    +				jobVertex.setParallelism(newParallelism);
    +			}
    +		}
    +	},
    +	// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
    +	RELAXED {
    --- End diff --
    
    Are you going to make this configurable in a follow up?


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169911850
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    --- End diff --
    
    Does it have to be part of the `JobMasterGateway` interface? It's only used from `rescaleJob`.


---

[GitHub] flink issue #5487: [FLINK-8656] [flip6] Add modify CLI command to rescale Fl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5487
  
    After rescaling `SocketWindowWordCount`, the parallelism exposed by the REST API remains unchanged.
    
    Steps to reproduce:
    
    Start netcat.
    ```
    nc -l 9001
    ```
    Submit job with parallelism 1.
    ```
    bin/flink run -p1 examples/streaming/SocketWindowWordCount.jar --port 9001
    ```
    
    Re-scale to parallelism 10.
    ```
    bin/flink modify 4f9e368f973be3780b75e49e50168fcc -p 10
    ```
    
    Check parallelism:
    ```
    curl localhost:9065/jobs/4f9e368f973be3780b75e49e50168fcc/config | jq .
    {
      "jid": "4f9e368f973be3780b75e49e50168fcc",
      "name": "Socket Window WordCount",
      "execution-config": {
        "execution-mode": "PIPELINED",
        "restart-strategy": "default",
        "job-parallelism": 1,
        "object-reuse-mode": false,
        "user-config": {}
      }
    }
    ```


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169920801
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job;
    +
    +import org.apache.flink.runtime.rest.messages.JobMessageParameters;
    +import org.apache.flink.runtime.rest.messages.MessagePathParameter;
    +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +
    +/**
    + * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}.
    + */
    +public class RescalingStatusMessageParameters extends JobMessageParameters {
    +
    +	public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter();
    +
    +	@Override
    +	public Collection<MessagePathParameter<?>> getPathParameters() {
    +		return Arrays.asList(jobPathParameter, triggerIdPathParameter);
    --- End diff --
    
    It's not very important but the `List` returned by `Arrays.asList` is mutable:
    
    ```
    	public static void main(String[] args) {
    		final List<String> strings = Arrays.asList("0", "1", "2");
    		strings.set(2, "3");
    
    		System.out.println(strings);
    		System.out.flush();
    
    		final List<String> strings2 = Collections.unmodifiableList(strings);
    		strings2.set(2, "2");
    	}
    ```
    
    output
    ```
    [0, 1, 3]
    Exception in thread "main" java.lang.UnsupportedOperationException
    	at java.util.Collections$UnmodifiableList.set(Collections.java:1311)
    	at org.apache.flink.runtime.dispatcher.Dispatcher.main(Dispatcher.java:347)
    ```


---

[GitHub] flink issue #5487: [FLINK-8656] [flip6] Add modify CLI command to rescale Fl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5487
  
    Thanks for the review @GJL. I've addressed your comments and rebased onto the latest master.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169919835
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}.
    + */
    +public class RescaleParallelismQueryParameter extends MessageQueryParameter<Integer> {
    +
    +	public static final String KEY = "parallelism";
    +
    +	protected RescaleParallelismQueryParameter() {
    +		super(KEY, MessageParameterRequisiteness.MANDATORY);
    +	}
    +
    +	@Override
    +	public Integer convertValueFromString(String value) {
    +		return Integer.valueOf(value);
    --- End diff --
    
    We could already validate that the integer is greater than 0. 
    Also there could be validation in `Dispatcher#rescaleJob` and `JobMaster#rescaleJob`


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169912256
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    --- End diff --
    
    nit: *configured*


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170014907
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    +			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
    +
    +			if (executionJobVertex != null) {
    +				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +			}
    +
    +			try {
    +				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +			} catch (FlinkException e) {
    +				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
    +
    +				log.info(msg, e);
    +
    +				return FutureUtils.completedExceptionally(
    +					new JobModificationException(msg, e));
    +			}
    +		}
    +
    +		final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +		final ExecutionGraph newExecutionGraph;
    +
    +		try {
    +			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +				null,
    +				jobGraph,
    +				jobMasterConfiguration.getConfiguration(),
    +				scheduledExecutorService,
    +				scheduledExecutorService,
    +				slotPool.getSlotProvider(),
    +				userCodeLoader,
    +				highAvailabilityServices.getCheckpointRecoveryFactory(),
    +				rpcTimeout,
    +				currentExecutionGraph.getRestartStrategy(),
    +				jobMetricGroup,
    +				1,
    +				blobServer,
    +				jobMasterConfiguration.getSlotRequestTimeout(),
    +				log);
    +		} catch (JobExecutionException | JobException e) {
    +			return FutureUtils.completedExceptionally(
    +				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
    +		}
    +
    +		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		checkpointCoordinator.stopCheckpointScheduler();
    +
    +		// 4. take a savepoint
    +		final CompletableFuture<String> savepointFuture = triggerSavepoint(
    +			jobMasterConfiguration.getTmpDirectory(),
    +			timeout);
    +
    +		final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
    +			.thenApplyAsync(
    +				(String savepointPath) -> {
    +					try {
    +						newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
    +							savepointPath,
    +							false,
    +							newExecutionGraph.getAllVertices(),
    +							userCodeLoader);
    +					} catch (Exception e) {
    +						disposeSavepoint(savepointPath);
    +
    +						throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e));
    +					}
    +
    +					// delete the savepoint file once we reach a terminal state
    +					newExecutionGraph.getTerminationFuture()
    +						.whenCompleteAsync(
    +							(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
    +							scheduledExecutorService);
    +
    +					return newExecutionGraph;
    +				}, scheduledExecutorService)
    +			.exceptionally(
    +				(Throwable failure) -> {
    +					// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
    +					// coordinator and abort the rescaling operation
    +					if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
    +						checkpointCoordinator.startCheckpointScheduler();
    +					}
    +
    +					throw new CompletionException(failure);
    +				});
    +
    +		// 5. suspend the current job
    +		final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync(
    +			(ExecutionGraph ignored) -> {
    +				currentExecutionGraph.suspend(new FlinkException("Job is being rescaled."));
    +				return currentExecutionGraph.getTerminationFuture();
    +			},
    +			getMainThreadExecutor());
    +
    +		final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept(
    +			(JobStatus jobStatus) -> {
    +				if (jobStatus != JobStatus.SUSPENDED) {
    +					final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName());
    +					log.info(msg);
    +					throw new CompletionException(new JobModificationException(msg));
    +				}
    +			});
    +
    +		// 6. resume the new execution graph from the taken savepoint
    +		final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync(
    +			executionGraphFuture,
    +			(Void ignored, ExecutionGraph restoredExecutionGraph) -> {
    +				// check if the ExecutionGraph is still the same
    +				//noinspection ObjectEquality
    --- End diff --
    
    There is. The `a == b` causes this. But here we want to check for referential identity.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170016414
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job;
    +
    +import org.apache.flink.runtime.rest.messages.JobMessageParameters;
    +import org.apache.flink.runtime.rest.messages.MessagePathParameter;
    +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +
    +/**
    + * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}.
    + */
    +public class RescalingStatusMessageParameters extends JobMessageParameters {
    +
    +	public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter();
    +
    +	@Override
    +	public Collection<MessagePathParameter<?>> getPathParameters() {
    +		return Arrays.asList(jobPathParameter, triggerIdPathParameter);
    --- End diff --
    
    True, there will be other places to fix as well. I'll leave it for a follow up task.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169916387
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    +			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
    +
    +			if (executionJobVertex != null) {
    +				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +			}
    +
    +			try {
    +				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +			} catch (FlinkException e) {
    +				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
    +
    +				log.info(msg, e);
    +
    +				return FutureUtils.completedExceptionally(
    +					new JobModificationException(msg, e));
    +			}
    +		}
    +
    +		final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +		final ExecutionGraph newExecutionGraph;
    +
    +		try {
    +			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +				null,
    +				jobGraph,
    +				jobMasterConfiguration.getConfiguration(),
    +				scheduledExecutorService,
    +				scheduledExecutorService,
    +				slotPool.getSlotProvider(),
    +				userCodeLoader,
    +				highAvailabilityServices.getCheckpointRecoveryFactory(),
    +				rpcTimeout,
    +				currentExecutionGraph.getRestartStrategy(),
    +				jobMetricGroup,
    +				1,
    +				blobServer,
    +				jobMasterConfiguration.getSlotRequestTimeout(),
    +				log);
    +		} catch (JobExecutionException | JobException e) {
    +			return FutureUtils.completedExceptionally(
    +				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
    +		}
    +
    +		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		checkpointCoordinator.stopCheckpointScheduler();
    +
    +		// 4. take a savepoint
    +		final CompletableFuture<String> savepointFuture = triggerSavepoint(
    +			jobMasterConfiguration.getTmpDirectory(),
    --- End diff --
    
    Why is it ok to use a tmp directory as the target directory? Shouldn't the directory be visible from all hosts?


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170014030
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util.function;
    +
    +import java.util.function.BiConsumer;
    +
    +/**
    + * A checked extension of the {@link BiConsumer} interface.
    + *
    + * @param <T> type of the first argument
    + * @param <U> type of the second argument
    + * @param <E> type of the thrown exception
    + */
    +@FunctionalInterface
    +public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
    +
    +	/**
    +	 * Performs this operation on the given arguments.
    +	 *
    +	 * @param t the first input argument
    +	 * @param u the second input argument
    +	 * @throws E in case of an error
    +	 */
    +	void acceptWithException(T t, U u) throws E;
    +
    +	@Override
    +	default void accept(T t, U u) {
    +		try {
    +			acceptWithException(t, u);
    +		} catch (Throwable e) {
    --- End diff --
    
    True, will change it to `ExceptionUtils.rethrow`.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170179289
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}.
    + */
    +public class RescaleParallelismQueryParameter extends MessageQueryParameter<Integer> {
    +
    +	public static final String KEY = "parallelism";
    +
    +	protected RescaleParallelismQueryParameter() {
    +		super(KEY, MessageParameterRequisiteness.MANDATORY);
    +	}
    +
    +	@Override
    +	public Integer convertValueFromString(String value) {
    +		return Integer.valueOf(value);
    --- End diff --
    
    ok


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169918052
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    +			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
    +
    +			if (executionJobVertex != null) {
    +				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +			}
    +
    +			try {
    +				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +			} catch (FlinkException e) {
    +				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
    +
    +				log.info(msg, e);
    +
    +				return FutureUtils.completedExceptionally(
    +					new JobModificationException(msg, e));
    +			}
    +		}
    +
    +		final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +		final ExecutionGraph newExecutionGraph;
    +
    +		try {
    +			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +				null,
    +				jobGraph,
    +				jobMasterConfiguration.getConfiguration(),
    +				scheduledExecutorService,
    +				scheduledExecutorService,
    +				slotPool.getSlotProvider(),
    +				userCodeLoader,
    +				highAvailabilityServices.getCheckpointRecoveryFactory(),
    +				rpcTimeout,
    +				currentExecutionGraph.getRestartStrategy(),
    +				jobMetricGroup,
    +				1,
    +				blobServer,
    +				jobMasterConfiguration.getSlotRequestTimeout(),
    +				log);
    +		} catch (JobExecutionException | JobException e) {
    +			return FutureUtils.completedExceptionally(
    +				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
    +		}
    +
    +		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		checkpointCoordinator.stopCheckpointScheduler();
    +
    +		// 4. take a savepoint
    +		final CompletableFuture<String> savepointFuture = triggerSavepoint(
    +			jobMasterConfiguration.getTmpDirectory(),
    +			timeout);
    +
    +		final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
    +			.thenApplyAsync(
    +				(String savepointPath) -> {
    +					try {
    +						newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
    +							savepointPath,
    +							false,
    +							newExecutionGraph.getAllVertices(),
    +							userCodeLoader);
    +					} catch (Exception e) {
    +						disposeSavepoint(savepointPath);
    +
    +						throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e));
    +					}
    +
    +					// delete the savepoint file once we reach a terminal state
    +					newExecutionGraph.getTerminationFuture()
    +						.whenCompleteAsync(
    +							(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
    +							scheduledExecutorService);
    +
    +					return newExecutionGraph;
    +				}, scheduledExecutorService)
    +			.exceptionally(
    +				(Throwable failure) -> {
    +					// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
    +					// coordinator and abort the rescaling operation
    +					if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
    +						checkpointCoordinator.startCheckpointScheduler();
    +					}
    +
    +					throw new CompletionException(failure);
    +				});
    +
    +		// 5. suspend the current job
    +		final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync(
    +			(ExecutionGraph ignored) -> {
    +				currentExecutionGraph.suspend(new FlinkException("Job is being rescaled."));
    +				return currentExecutionGraph.getTerminationFuture();
    +			},
    +			getMainThreadExecutor());
    +
    +		final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept(
    +			(JobStatus jobStatus) -> {
    +				if (jobStatus != JobStatus.SUSPENDED) {
    +					final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName());
    +					log.info(msg);
    +					throw new CompletionException(new JobModificationException(msg));
    +				}
    +			});
    +
    +		// 6. resume the new execution graph from the taken savepoint
    +		final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync(
    +			executionGraphFuture,
    +			(Void ignored, ExecutionGraph restoredExecutionGraph) -> {
    +				// check if the ExecutionGraph is still the same
    +				//noinspection ObjectEquality
    +				if (executionGraph == currentExecutionGraph) {
    +					executionGraph = restoredExecutionGraph;
    +
    +					scheduleExecutionGraph();
    +
    +					return Acknowledge.get();
    +				} else {
    +					throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the resacling."));
    --- End diff --
    
    nit: typo: *Aborting the rescaling*


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169917845
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    +			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
    +
    +			if (executionJobVertex != null) {
    +				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +			}
    +
    +			try {
    +				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +			} catch (FlinkException e) {
    +				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
    +
    +				log.info(msg, e);
    +
    +				return FutureUtils.completedExceptionally(
    +					new JobModificationException(msg, e));
    +			}
    +		}
    +
    +		final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +		final ExecutionGraph newExecutionGraph;
    +
    +		try {
    +			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +				null,
    +				jobGraph,
    +				jobMasterConfiguration.getConfiguration(),
    +				scheduledExecutorService,
    +				scheduledExecutorService,
    +				slotPool.getSlotProvider(),
    +				userCodeLoader,
    +				highAvailabilityServices.getCheckpointRecoveryFactory(),
    +				rpcTimeout,
    +				currentExecutionGraph.getRestartStrategy(),
    +				jobMetricGroup,
    +				1,
    +				blobServer,
    +				jobMasterConfiguration.getSlotRequestTimeout(),
    +				log);
    +		} catch (JobExecutionException | JobException e) {
    +			return FutureUtils.completedExceptionally(
    +				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
    +		}
    +
    +		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		checkpointCoordinator.stopCheckpointScheduler();
    +
    +		// 4. take a savepoint
    +		final CompletableFuture<String> savepointFuture = triggerSavepoint(
    +			jobMasterConfiguration.getTmpDirectory(),
    +			timeout);
    +
    +		final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
    +			.thenApplyAsync(
    +				(String savepointPath) -> {
    +					try {
    +						newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
    +							savepointPath,
    +							false,
    +							newExecutionGraph.getAllVertices(),
    +							userCodeLoader);
    +					} catch (Exception e) {
    +						disposeSavepoint(savepointPath);
    +
    +						throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e));
    +					}
    +
    +					// delete the savepoint file once we reach a terminal state
    +					newExecutionGraph.getTerminationFuture()
    +						.whenCompleteAsync(
    +							(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
    +							scheduledExecutorService);
    +
    +					return newExecutionGraph;
    +				}, scheduledExecutorService)
    +			.exceptionally(
    +				(Throwable failure) -> {
    +					// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
    +					// coordinator and abort the rescaling operation
    +					if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
    +						checkpointCoordinator.startCheckpointScheduler();
    +					}
    +
    +					throw new CompletionException(failure);
    +				});
    +
    +		// 5. suspend the current job
    +		final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync(
    +			(ExecutionGraph ignored) -> {
    +				currentExecutionGraph.suspend(new FlinkException("Job is being rescaled."));
    +				return currentExecutionGraph.getTerminationFuture();
    +			},
    +			getMainThreadExecutor());
    +
    +		final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept(
    +			(JobStatus jobStatus) -> {
    +				if (jobStatus != JobStatus.SUSPENDED) {
    +					final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName());
    +					log.info(msg);
    +					throw new CompletionException(new JobModificationException(msg));
    +				}
    +			});
    +
    +		// 6. resume the new execution graph from the taken savepoint
    +		final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync(
    +			executionGraphFuture,
    +			(Void ignored, ExecutionGraph restoredExecutionGraph) -> {
    +				// check if the ExecutionGraph is still the same
    +				//noinspection ObjectEquality
    --- End diff --
    
    I think on this line there is no warning to suppress. 


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170015457
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}.
    + */
    +public class RescaleParallelismQueryParameter extends MessageQueryParameter<Integer> {
    +
    +	public static final String KEY = "parallelism";
    +
    +	protected RescaleParallelismQueryParameter() {
    +		super(KEY, MessageParameterRequisiteness.MANDATORY);
    +	}
    +
    +	@Override
    +	public Integer convertValueFromString(String value) {
    +		return Integer.valueOf(value);
    --- End diff --
    
    Will add it to the `JobMaster`.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170179361
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    --- End diff --
    
    ok


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169922284
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}.
    + */
    +public class RescaleParallelismQueryParameter extends MessageQueryParameter<Integer> {
    +
    +	public static final String KEY = "parallelism";
    +
    +	protected RescaleParallelismQueryParameter() {
    +		super(KEY, MessageParameterRequisiteness.MANDATORY);
    +	}
    +
    +	@Override
    +	public Integer convertValueFromString(String value) {
    +		return Integer.valueOf(value);
    --- End diff --
    
    Class was renamed to `RescalingParallelismQueryParameter`


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170014483
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    --- End diff --
    
    Yes it will be necessary, once we do rescaling for individual operators. This will be the case with the rescaling policies.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169918959
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer;
    +import org.apache.flink.util.SerializedThrowable;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Basic information object for asynchronous operations.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class AsynchronousOperationInfo {
    +
    +	private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
    +
    +	@JsonProperty(FIELD_NAME_FAILURE_CAUSE)
    +	@JsonSerialize(using = SerializedThrowableSerializer.class)
    +	@Nullable
    +	private final SerializedThrowable failureCause;
    +
    +	private AsynchronousOperationInfo(
    +		@JsonProperty(FIELD_NAME_FAILURE_CAUSE)
    --- End diff --
    
    nit: indentation


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169913228
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -963,6 +1120,36 @@ private Acknowledge suspendExecution(final Exception cause) {
     		return Acknowledge.get();
     	}
     
    +	/**
    +	 * Schedules the execution of the current {@link ExecutionGraph}.
    +	 */
    +	private void scheduleExecutionGraph() {
    +		try {
    +			executionGraph.scheduleForExecution();
    +		}
    +		catch (Throwable t) {
    +			executionGraph.failGlobal(t);
    +		}
    +	}
    +
    +	/**
    +	 * Dispose the savepoint stored under the given path.
    +	 *
    +	 * @param savepointPath path where the savepoint is stored
    +	 */
    +	private void disposeSavepoint(String savepointPath) {
    +		try {
    +			// delete the temporary savepoint
    +			Checkpoints.disposeSavepoint(
    +				savepointPath,
    +				jobMasterConfiguration.getConfiguration(),
    +				userCodeLoader,
    +				log);
    +		} catch (FlinkException | IOException de) {
    --- End diff --
    
    Why `de`? Just want to make sure it's not a typo.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170014666
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -963,6 +1120,36 @@ private Acknowledge suspendExecution(final Exception cause) {
     		return Acknowledge.get();
     	}
     
    +	/**
    +	 * Schedules the execution of the current {@link ExecutionGraph}.
    +	 */
    +	private void scheduleExecutionGraph() {
    +		try {
    +			executionGraph.scheduleForExecution();
    +		}
    +		catch (Throwable t) {
    +			executionGraph.failGlobal(t);
    +		}
    +	}
    +
    +	/**
    +	 * Dispose the savepoint stored under the given path.
    +	 *
    +	 * @param savepointPath path where the savepoint is stored
    +	 */
    +	private void disposeSavepoint(String savepointPath) {
    +		try {
    +			// delete the temporary savepoint
    +			Checkpoints.disposeSavepoint(
    +				savepointPath,
    +				jobMasterConfiguration.getConfiguration(),
    +				userCodeLoader,
    +				log);
    +		} catch (FlinkException | IOException de) {
    --- End diff --
    
    is a typo. Will change it.


---

[GitHub] flink issue #5487: [FLINK-8656] [flip6] Add modify CLI command to rescale Fl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5487
  
    @GJL: 
    
    The temporary savepoint is only relevant for a single `JobMaster` because the rescaling operation is not persisted. In case of a master failover the user would have to retrigger the rescaling on the new `JobMaster`.
    
    Concerning the check: True but this would be an unrelated change which should be part of a separate issue.


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169634082
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util.function;
    +
    +import java.util.function.BiConsumer;
    +
    +/**
    + * A checked extension of the {@link BiConsumer} interface.
    + *
    + * @param <T> type of the first argument
    + * @param <U> type of the second argument
    + * @param <E> type of the thrown exception
    + */
    +@FunctionalInterface
    +public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
    +
    +	/**
    +	 * Performs this operation on the given arguments.
    +	 *
    +	 * @param t the first input argument
    +	 * @param u the second input argument
    +	 * @throws E in case of an error
    +	 */
    +	void acceptWithException(T t, U u) throws E;
    +
    +	@Override
    +	default void accept(T t, U u) {
    +		try {
    +			acceptWithException(t, u);
    +		} catch (Throwable e) {
    --- End diff --
    
    Normally `Error`s shouldn't be caught https://stackoverflow.com/questions/581878/why-catch-exceptions-in-java-when-you-can-catch-throwables


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5487


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170179272
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleJob(
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +		for (JobVertex jobVertex : jobGraph.getVertices()) {
    +			allOperators.add(jobVertex.getID());
    +		}
    +
    +		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    +	}
    +
    +	@Override
    +	public CompletableFuture<Acknowledge> rescaleOperators(
    +			Collection<JobVertexID> operators,
    +			int newParallelism,
    +			RescalingBehaviour rescalingBehaviour,
    +			Time timeout) {
    +		// 1. Check whether we can rescale the job & rescale the respective vertices
    +		for (JobVertexID jobVertexId : operators) {
    +			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
    +
    +			// update max parallelism in case that it has not been configure
    +			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
    +
    +			if (executionJobVertex != null) {
    +				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +			}
    +
    +			try {
    +				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +			} catch (FlinkException e) {
    +				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
    +
    +				log.info(msg, e);
    +
    +				return FutureUtils.completedExceptionally(
    +					new JobModificationException(msg, e));
    +			}
    +		}
    +
    +		final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +		final ExecutionGraph newExecutionGraph;
    +
    +		try {
    +			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +				null,
    +				jobGraph,
    +				jobMasterConfiguration.getConfiguration(),
    +				scheduledExecutorService,
    +				scheduledExecutorService,
    +				slotPool.getSlotProvider(),
    +				userCodeLoader,
    +				highAvailabilityServices.getCheckpointRecoveryFactory(),
    +				rpcTimeout,
    +				currentExecutionGraph.getRestartStrategy(),
    +				jobMetricGroup,
    +				1,
    +				blobServer,
    +				jobMasterConfiguration.getSlotRequestTimeout(),
    +				log);
    +		} catch (JobExecutionException | JobException e) {
    +			return FutureUtils.completedExceptionally(
    +				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
    +		}
    +
    +		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		checkpointCoordinator.stopCheckpointScheduler();
    +
    +		// 4. take a savepoint
    +		final CompletableFuture<String> savepointFuture = triggerSavepoint(
    +			jobMasterConfiguration.getTmpDirectory(),
    +			timeout);
    +
    +		final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
    +			.thenApplyAsync(
    +				(String savepointPath) -> {
    +					try {
    +						newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
    +							savepointPath,
    +							false,
    +							newExecutionGraph.getAllVertices(),
    +							userCodeLoader);
    +					} catch (Exception e) {
    +						disposeSavepoint(savepointPath);
    +
    +						throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e));
    +					}
    +
    +					// delete the savepoint file once we reach a terminal state
    +					newExecutionGraph.getTerminationFuture()
    +						.whenCompleteAsync(
    +							(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
    +							scheduledExecutorService);
    +
    +					return newExecutionGraph;
    +				}, scheduledExecutorService)
    +			.exceptionally(
    +				(Throwable failure) -> {
    +					// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
    +					// coordinator and abort the rescaling operation
    +					if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
    +						checkpointCoordinator.startCheckpointScheduler();
    +					}
    +
    +					throw new CompletionException(failure);
    +				});
    +
    +		// 5. suspend the current job
    +		final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync(
    +			(ExecutionGraph ignored) -> {
    +				currentExecutionGraph.suspend(new FlinkException("Job is being rescaled."));
    +				return currentExecutionGraph.getTerminationFuture();
    +			},
    +			getMainThreadExecutor());
    +
    +		final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept(
    +			(JobStatus jobStatus) -> {
    +				if (jobStatus != JobStatus.SUSPENDED) {
    +					final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName());
    +					log.info(msg);
    +					throw new CompletionException(new JobModificationException(msg));
    +				}
    +			});
    +
    +		// 6. resume the new execution graph from the taken savepoint
    +		final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync(
    +			executionGraphFuture,
    +			(Void ignored, ExecutionGraph restoredExecutionGraph) -> {
    +				// check if the ExecutionGraph is still the same
    +				//noinspection ObjectEquality
    --- End diff --
    
    ok


---

[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r170013389
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.function.BiConsumerWithException;
    +
    +/**
    + * Definition of the rescaling behaviour.
    + */
    +public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Integer, FlinkException> {
    +	// rescaling is only executed if the operator can be set to the given parallelism
    +	STRICT {
    +		@Override
    +		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
    +			if (jobVertex.getMaxParallelism() < newParallelism) {
    +				throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
    +					" because its maximum parallelism " + jobVertex.getMaxParallelism() +
    +					" is smaller than the new parallelism " + newParallelism + '.');
    +			} else {
    +				jobVertex.setParallelism(newParallelism);
    +			}
    +		}
    +	},
    +	// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
    +	RELAXED {
    --- End diff --
    
    Yes


---