You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2018/07/09 07:21:56 UTC

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/6283

    [FLINK-9143] Use cluster strategy if none was set on client side

    ## What is the purpose of the change
    
    The goal of this PR is to enable configuring default restart strategy from the server side's config.
    
    
    ## Brief change log
    
      * no strategy is set on the client side if none explicitly specified
      * on server side the strategy is resolved based on: client configuration, server side configuration, fallback to `FixedDelayStrategy` if none set on client side and `NoRestartStrategy` set on server side in case of checkpointing enabled
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
      - RestartStrategyResolvingTest.java
      - tests using cluster pass
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink FLINK-9143

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6283
    
----
commit 8efded9c1e1a555edd7733b35d9f1f49f8cc7304
Author: Dawid Wysakowicz <dw...@...>
Date:   2018-07-05T11:48:23Z

    [FLINK-9143] Use cluster strategy if none was set on client side

----


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202070710
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---
    @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {
     						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
     						true),
     					null));
    +
    +			ExecutionConfig executionConfig = new ExecutionConfig();
    +			executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
    --- End diff --
    
    Should we maybe set the `FallbackRestartStrategyConfiguration` per default in the `ExecutionConfig`? That way, we could also simplify the resolve code.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202298725
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that in a streaming use case where checkpointing is enabled, a
    +	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
    +	 * strategy has been specified.
    +	 */
    +	@Test
    +	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
    +		// create savepoint data
    +		final long savepointId = 42L;
    +		final File savepointFile = createSavepoint(savepointId);
    +
    +		// set savepoint settings
    +		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
    +			savepointFile.getAbsolutePath(),
    +			true);
    +		final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
    +
    +		final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
    +		final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
    +			completedCheckpointStore,
    +			new StandaloneCheckpointIDCounter());
    +		haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
    +		final JobMaster jobMaster = createJobMaster(
    +			new Configuration(),
    +			jobGraph,
    +			haServices,
    +			new TestingJobManagerSharedServicesBuilder().build());
    --- End diff --
    
    Changing this line into
    ```
    new TestingJobManagerSharedServicesBuilder()
    				.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
    				.build()
    ```
    Will make the test fail.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202065356
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.time.Time;
    +
    +import org.junit.Test;
    +
    +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for {@link RestartStrategyResolving}.
    + */
    +public class RestartStrategyResolvingTest {
    +
    +	@Test
    +	public void testClientSideHighestPriority() {
    +
    +		RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(noRestart(),
    +			new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
    +			true);
    +
    +		assertTrue(resolvedStrategy instanceof NoRestartStrategy);
    --- End diff --
    
    For the future I would suggest to use Hamcrest matchers, because they give better failure messages and are more expressive.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202012704
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Utility method for resolving {@link RestartStrategy}.
    + */
    +public final class RestartStrategyResolving {
    +
    +	private static final long DEFAULT_RESTART_DELAY = 0;
    +
    +	/**
    +	 * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
    +	 * The resolving strategy is as follows:
    +	 * <ol>
    +	 * <li>Strategy set within job graph.</li>
    +	 * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing is enabled.</li>
    +	 * <li>If no strategy was set on client and server side and checkpointing was enabled then {@link FixedDelayRestartStrategy} is used</li>
    +	 * </ol>
    +	 *
    +	 * @param clientConfiguration    restart configuration given within the job graph
    +	 * @param serverStrategyFactory  default server side strategy factory
    +	 * @param isCheckpointingEnabled if checkpointing was enabled for the job
    --- End diff --
    
    Please don't align the java doc strings. The problem is whenever someone changes the names of the parameters, he will be tempted to also correct the then wrong indentation which is unnecessary work. 


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202333518
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---
    @@ -567,22 +560,16 @@ private void configureCheckpointing() {
     
     		long interval = cfg.getCheckpointInterval();
     		if (interval > 0) {
    -
     			ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
     			// propagate the expected behaviour for checkpoint errors to task.
     			executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
    -
    -			// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
    -			if (executionConfig.getRestartStrategy() == null) {
    -				// if the user enabled checkpointing, the default number of exec retries is infinite.
    -				executionConfig.setRestartStrategy(
    -					RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
    -			}
     		} else {
     			// interval of max value means disable periodic checkpoint
     			interval = Long.MAX_VALUE;
     		}
     
    +
    +
    --- End diff --
    
    Remove two line breaks


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202304777
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
    @@ -339,12 +339,28 @@ public void setSnapshotSettings(JobCheckpointingSettings settings) {
     	 * Gets the settings for asynchronous snapshots. This method returns null, when
     	 * checkpointing is not enabled.
     	 *
    -	 * @return The snapshot settings, or null, if checkpointing is not enabled.
    +	 * @return The snapshot settings
     	 */
     	public JobCheckpointingSettings getCheckpointingSettings() {
     		return snapshotSettings;
     	}
     
    +	/**
    +	 * Checks if the checkpointing was enabled for this job graph
    +	 *
    +	 * @return true if checkpointing enabled
    +	 */
    +	public boolean isCheckpointingEnabled() {
    +
    +		if (snapshotSettings == null) {
    +			return false;
    +		}
    +
    +		long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
    +		return checkpointInterval > 0 &&
    +			checkpointInterval < Long.MAX_VALUE;
    --- End diff --
    
    I think technically, we enable checkpointing, meaning creating a `CheckpointCoordinator`, always iff `snapshotSettings != null`. We could also say that we check the `CheckpointCoordinator.isPeriodicCheckpointingConfigured` in order to decide whether checkpointing is enabled. Then we would not need to introduce this method which could go out of sync with how we define whether checkpointing is enabled or not. What do you think?


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202320783
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---
    @@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that in a streaming use case where checkpointing is enabled, a
    +	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
    +	 * strategy has been specified.
    +	 */
    +	@Test
    +	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
    +		// create savepoint data
    +		final long savepointId = 42L;
    +		final File savepointFile = createSavepoint(savepointId);
    +
    +		// set savepoint settings
    +		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
    +			savepointFile.getAbsolutePath(),
    +			true);
    +		final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
    +
    +		final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
    +		final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
    +			completedCheckpointStore,
    +			new StandaloneCheckpointIDCounter());
    +		haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
    +		final JobMaster jobMaster = createJobMaster(
    +			new Configuration(),
    +			jobGraph,
    +			haServices,
    +			new TestingJobManagerSharedServicesBuilder().build());
    --- End diff --
    
    This was the problem with wrongly handling default value in `RestartStrategyFactory`. Fixed now.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202297182
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategy.java ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +
    +/**
    + * Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy}
    + * depending if checkpointing was enabled.
    + */
    +public class NoOrFixedIfCheckpointingEnabledRestartStrategy implements RestartStrategy {
    +
    +	private static final long DEFAULT_RESTART_DELAY = 0;
    +
    +	private final RestartStrategy resolvedStrategy;
    +
    +	/**
    +	 * Creates a NoOrFixedIfCheckpointingEnabledRestartStrategyFactory instance.
    +	 *
    +	 * @param configuration Configuration object which is ignored
    +	 * @return NoOrFixedIfCheckpointingEnabledRestartStrategyFactory instance
    +	 */
    +	public static NoOrFixedIfCheckpointingEnabledRestartStrategyFactory createFactory(Configuration configuration) {
    +		return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
    +	}
    +
    +	/**
    +	 * Creates instance of NoOrFixedIfCheckpointingEnabledRestartStrategy
    +	 *
    +	 * @param isCheckpointingEnabled if true resolves to {@link FixedDelayRestartStrategy}
    +	 * otherwise to {@link NoRestartStrategy}
    +	 */
    +	public NoOrFixedIfCheckpointingEnabledRestartStrategy(boolean isCheckpointingEnabled) {
    +		if (isCheckpointingEnabled) {
    +			resolvedStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY);
    +		} else {
    +			resolvedStrategy = new NoRestartStrategy();
    +		}
    +	}
    +
    +	@Override
    +	public boolean canRestart() {
    +		return resolvedStrategy.canRestart();
    +	}
    +
    +	@Override
    +	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
    +		resolvedStrategy.restart(restarter, executor);
    +	}
    +
    +	public static class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory {
    --- End diff --
    
    Wouldn't it be enough to only have this restart strategy factory without the corresponding `RestartStrategy`? We could instantiate the respective strategies in the `createRestartStrategy(boolean isCheckpointingEnabled)` method.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6283


---

[GitHub] flink issue #6283: [FLINK-9143] Use cluster strategy if none was set on clie...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6283
  
    Alright, I think I fetched the last commit as well. Once Travis gives green light, I'll merge it.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202098798
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---
    @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {
     						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
     						true),
     					null));
    +
    +			ExecutionConfig executionConfig = new ExecutionConfig();
    +			executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
    --- End diff --
    
    Right now null is a bit different than `FallbackRestartStrategy`. 
    * null - allows fallback to `FixedRestartStrategy` in case of checkpointing enabled and `noRestart` was set on server-side
    * `FallbackRestartStrategy` - always the server-side strategy is used (indifferent to checkpointing)
    
    If we by default set the `FallbackStrategy` we have two options:
     * we either always set `FixedRestartStrategy` if checkpointing is enabled and `noRestart` was set on server side
    * we never automatically fallback to `FixedRestartStrategy`, even in case of checkpointing.
    
    What do you think would be better option? Keep the null, always fallback to `FixedRestartStrategy` or never fallback to it?


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202071547
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Utility method for resolving {@link RestartStrategy}.
    + */
    +public final class RestartStrategyResolving {
    +
    +	private static final long DEFAULT_RESTART_DELAY = 0;
    +
    +	/**
    +	 * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
    +	 * The resolving strategy is as follows:
    +	 * <ol>
    +	 * <li>Strategy set within job graph.</li>
    +	 * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing is enabled.</li>
    +	 * <li>If no strategy was set on client and server side and checkpointing was enabled then {@link FixedDelayRestartStrategy} is used</li>
    +	 * </ol>
    +	 *
    +	 * @param clientConfiguration    restart configuration given within the job graph
    +	 * @param serverStrategyFactory  default server side strategy factory
    +	 * @param isCheckpointingEnabled if checkpointing was enabled for the job
    +	 * @return resolved strategy
    +	 */
    +	public static RestartStrategy resolve(
    +			@Nullable RestartStrategies.RestartStrategyConfiguration clientConfiguration,
    --- End diff --
    
    By setting the default restart strategy to `FallbackRestartStrategyConfiguration` in the `ExecutionConfig` we could remove the `@Nullable` annotation here and simplify the code by avoiding the null checks.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202070020
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java ---
    @@ -33,12 +33,11 @@
     public class RestartStrategyTest extends TestLogger {
     
     	/**
    -	 * Tests that in a streaming use case where checkpointing is enabled, a
    -	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
    -	 * strategy has been specified.
    +	 * Tests that in a streaming use case where checkpointing is enabled, there is no default strategy set on the
    +	 * client side.
     	 */
     	@Test
    -	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
    +	public void testNoDefaultStrategyOnClientSideWhenCheckpointing() throws Exception {
    --- End diff --
    
    Maybe `testNoDefaultStrategyOnClientSideWhenCheckpointingEnabled`


---

[GitHub] flink issue #6283: [FLINK-9143] Use cluster strategy if none was set on clie...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6283
  
    Thanks @tillrohrmann for review. I've pushed one more commit that fixes test failure. It adds proper comparison of `RestartStrategies`, otherwise `org.apache.flink.api.common.ExecutionConfigTest#testExecutionConfigSerialization` fails.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202297327
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java ---
    @@ -149,7 +149,7 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration
     				}
     
     				// fallback in case of an error
    -				return NoRestartStrategy.createFactory(configuration);
    +				return NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration);
    --- End diff --
    
    I think we should also create this factory if the `restart-strategy` configuration value is `"non"` which is the default value.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202013103
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Utility method for resolving {@link RestartStrategy}.
    + */
    +public final class RestartStrategyResolving {
    +
    +	private static final long DEFAULT_RESTART_DELAY = 0;
    +
    +	/**
    +	 * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
    +	 * The resolving strategy is as follows:
    +	 * <ol>
    +	 * <li>Strategy set within job graph.</li>
    +	 * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing is enabled.</li>
    +	 * <li>If no strategy was set on client and server side and checkpointing was enabled then {@link FixedDelayRestartStrategy} is used</li>
    +	 * </ol>
    +	 *
    +	 * @param clientConfiguration    restart configuration given within the job graph
    +	 * @param serverStrategyFactory  default server side strategy factory
    +	 * @param isCheckpointingEnabled if checkpointing was enabled for the job
    +	 * @return resolved strategy
    +	 */
    +	public static RestartStrategy resolve(
    +			@Nullable RestartStrategies.RestartStrategyConfiguration clientConfiguration,
    +			RestartStrategyFactory serverStrategyFactory,
    +			boolean isCheckpointingEnabled) {
    +
    +		final RestartStrategy serverSideRestartStrategy = serverStrategyFactory.createRestartStrategy();
    +
    +		RestartStrategy clientSideRestartStrategy = null;
    --- End diff --
    
    could be `final`


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202103545
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---
    @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {
     						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
     						true),
     					null));
    +
    +			ExecutionConfig executionConfig = new ExecutionConfig();
    +			executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
    --- End diff --
    
    You're right. I'm just wondering whether you ever want to enable checkpointing without a restart strategy. So to speak if you set `FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or `NoRestartStrategy`?
    
    On the other hand you might want to disable restarting for all jobs running on your cluster by setting the restart strategy to `NoRestartStrategy`.
    
    Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` to `FallbackRestartStrategy` and introduce a new default server side restart strategy `NoOrFixedIfCheckpointingEnabled` which resolved to `FixedRestartStrategy` if checkpointing is enabled and if not it resolves to `NoRestartStrategy`.
    
    What do you think?


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202065050
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.time.Time;
    +
    +import org.junit.Test;
    +
    +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for {@link RestartStrategyResolving}.
    + */
    +public class RestartStrategyResolvingTest {
    --- End diff --
    
    Test classes should extend from `TestLogger` to give better test logging output separation.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202319384
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
    @@ -339,12 +339,28 @@ public void setSnapshotSettings(JobCheckpointingSettings settings) {
     	 * Gets the settings for asynchronous snapshots. This method returns null, when
     	 * checkpointing is not enabled.
     	 *
    -	 * @return The snapshot settings, or null, if checkpointing is not enabled.
    +	 * @return The snapshot settings
     	 */
     	public JobCheckpointingSettings getCheckpointingSettings() {
     		return snapshotSettings;
     	}
     
    +	/**
    +	 * Checks if the checkpointing was enabled for this job graph
    +	 *
    +	 * @return true if checkpointing enabled
    +	 */
    +	public boolean isCheckpointingEnabled() {
    +
    +		if (snapshotSettings == null) {
    +			return false;
    +		}
    +
    +		long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
    +		return checkpointInterval > 0 &&
    +			checkpointInterval < Long.MAX_VALUE;
    --- End diff --
    
    I don't think it is true (about the checkpoint enabling). I thought the same based on some javadocs, but it turned out that `snapshotSetting` is always set in `org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#configureCheckpointing`.
    That's why I added this method.
    
    The problem with the second method is that the `CheckpointCoordinator` is created while constructing `ExecutionGraph` which requires the restartstrategy. I thought adding this method was the least invasive one.


---

[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6283#discussion_r202311125
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java ---
    @@ -149,7 +149,7 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration
     				}
     
     				// fallback in case of an error
    -				return NoRestartStrategy.createFactory(configuration);
    +				return NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration);
    --- End diff --
    
    Don't know why, but assumed the `default` branch is reached in case nothing was set in config. My mistake.
    
    I've fixed it to differentiate the situation when `"none"` was set (this value is used across documentation, I think it should translate directly to `NoRestart`) and when the config was not set at all.


---