You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/06/26 08:29:15 UTC

[GitHub] flink pull request #6208: Disable local recovery scheduling

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/6208

    Disable local recovery scheduling

    ## What is the purpose of the change
    
    Introduce a SchedulingStrategy which is used by the SlotPool to schedule tasks.
    The default implementation is LocationPreferenceSchedulingStrategy which tries
    to schedule tasks to their preferred locations. In order to support local recovery
    the PreviousAllocationSchedulingStrategy schedules tasks to their previous
    allocation.
    
    The scheduling strategy is selected based on the configuration option
    state.backend.local-recovery. If set to true, then PreviousAllocationSchedulingStrategy
    is selected. Otherwise LocationPreferenceSchedulingStrategy is selected.
    
    ## Brief change log
    
    - Introduced a `SlotPoolFactory` to make the `SlotPool` instantiation configurable
    - Introduce `SchedulingStrategy` as scheduling logic abstraction
    - Introduce `LocationPreferenceSchedulingStrategy` as default implementation
    - Introduce `PreviousAllocationSchedulingStrategy` as scheduling strategy for local recovery
    - Move scheduling logic out of `SlotProfile` into `SchedulingStrategy`
    - Instantiate `SchedulingStrategy` based on `state.backend.local-recovery` option
    
    ## Verifying this change
    
    - Added `SchedulingITCase` which covers the problem with local recovery
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink disableLocalRecoveryScheduling

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6208
    
----
commit 9fd63001497793ce0ed7e224d5678842e34484cd
Author: Till Rohrmann <tr...@...>
Date:   2018-06-25T08:16:31Z

    [hotfix] Introduce SlotPoolResource and TestingRpcServiceResource

commit 1f867c9a1f3524dce0329d3198b950e2ad62a82c
Author: Till Rohrmann <tr...@...>
Date:   2018-06-25T08:34:30Z

    [hotfix] Introduce SlotPoolFactory to make SlotPool instantiation configurable

commit b552a2344529a272ec098bfd9c82d4a30f860726
Author: Till Rohrmann <tr...@...>
Date:   2018-06-26T08:01:19Z

    [hotfix] Remove SlotIdleTimeout from JobMasterConfiguration

commit 5d80afdf11ab35b17e00b846f00ba586880fc8d7
Author: Till Rohrmann <tr...@...>
Date:   2018-06-22T14:34:10Z

    [FLINK-9634] Disable local recovery scheduling if local recovery is disabled
    
    Introduce a SchedulingStrategy which is used by the SlotPool to schedule tasks.
    The default implementation is LocationPreferenceSchedulingStrategy which tries
    to schedule tasks to their preferred locations. In order to support local recovery
    the PreviousAllocationSchedulingStrategy schedules tasks to their previous
    allocation.
    
    The scheduling strategy is selected based on the configuration option
    state.backend.local-recovery. If set to true, then PreviousAllocationSchedulingStrategy
    is selected. Otherwise LocationPreferenceSchedulingStrategy is selected.

----


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198421151
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java ---
    @@ -75,10 +82,21 @@ public static DefaultSlotPoolFactory fromConfiguration(
     		final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
     		final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
     
    +		final SchedulingStrategy schedulingStrategy = selectSchedulingStrategy(configuration);
    +
     		return new DefaultSlotPoolFactory(
     			rpcService,
    +			schedulingStrategy,
     			SystemClock.getInstance(),
     			rpcTimeout,
     			slotIdleTimeout);
     	}
    +
    +	private static SchedulingStrategy selectSchedulingStrategy(Configuration configuration) {
    +		if (configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) {
    +			return PreviousAllocationSchedulingStrategy.getInstance();
    +		} else {
    +			return LocationPreferenceSchedulingStrategy.getInstance();
    --- End diff --
    
    It is used if the `Execution` don't have a previous location assigned. You can see this in the implementation of the `PreviousAllocationSchedulingStrategy` which extends `LocationPreferenceSchedulingStrategy`.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198420806
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -280,12 +281,7 @@ public JobMaster(
     
     		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
     
    -		this.slotPool = new SlotPool(
    -			rpcService,
    -			jobGraph.getJobID(),
    -			SystemClock.getInstance(),
    -			rpcTimeout,
    -			jobMasterConfiguration.getSlotIdleTimeout());
    +		this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
    --- End diff --
    
    The reason why I introduced the factory is that we want to create a `SlotPool` for the `JobID` of the given `JobGraph`. That way we enforce that the `SlotPool` is strictly instantiated with the right `JobID`. If we move the instantiation of the `SlotPool` out of the `JobMaster` we would have to rely on the outside code. Therefore, I would like to keep it like it is.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann closed the pull request at:

    https://github.com/apache/flink/pull/6208


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198453459
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * Default {@link SchedulingStrategy} which tries to match a slot with its location preferences.
    + */
    +public class LocationPreferenceSchedulingStrategy implements SchedulingStrategy {
    +
    +	private static final LocationPreferenceSchedulingStrategy INSTANCE = new LocationPreferenceSchedulingStrategy();
    +
    +	/**
    +	 * Calculates the candidate's locality score.
    +	 */
    +	private static final BiFunction<Integer, Integer, Integer> LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh;
    +
    +	LocationPreferenceSchedulingStrategy() {}
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198129785
  
    --- Diff: flink-tests/src/test/resources/log4j-test.properties ---
    @@ -18,7 +18,7 @@
     
     # Set root logger level to OFF to not flood build logs
     # set manually to INFO for debugging purposes
    -log4j.rootLogger=OFF, testlogger
    +log4j.rootLogger=INFO, testlogger
    --- End diff --
    
    This looks like should be reverted.


---

[GitHub] flink issue #6208: [FLINK-9634] Disable local recovery scheduling if local r...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6208
  
    I think this is a good fix for the moment. Only had minor comments inline. LGTM 👍 


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198122538
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -280,12 +281,7 @@ public JobMaster(
     
     		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
     
    -		this.slotPool = new SlotPool(
    -			rpcService,
    -			jobGraph.getJobID(),
    -			SystemClock.getInstance(),
    -			rpcTimeout,
    -			jobMasterConfiguration.getSlotIdleTimeout());
    +		this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
    --- End diff --
    
    After that, I also wonder if the factory still makes sense because we statically always use the same concrete implementation.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198146426
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * Classes that implement this interface provide a method to match objects to somehow represent slot candidates
    + * against the {@link SlotProfile} that produced the matcher object. A matching candidate is transformed into a
    + * desired result. If the matcher does not find a matching candidate, it returns null.
    + */
    +public interface SchedulingStrategy {
    +
    +	/**
    +	 * This method takes the candidate slots, extracts slot contexts from them, filters them by the profile
    +	 * requirements and potentially by additional requirements, and produces a result from a match.
    +	 *
    +	 * @param slotProfile slotProfile for which to find a matching slot
    --- End diff --
    
    Indentation looks a bit inconsistent.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198421680
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * {@link SchedulingStrategy} which tries to match a slot with its previous {@link AllocationID}.
    + * If the previous allocation cannot be found, then it returns {@code null}. If the slot has not
    + * been scheduled before (no assigned allocation id), it will fall back to
    + * {@link LocationPreferenceSchedulingStrategy}.
    + */
    +public class PreviousAllocationSchedulingStrategy extends LocationPreferenceSchedulingStrategy {
    +
    +	private static final PreviousAllocationSchedulingStrategy INSTANCE = new PreviousAllocationSchedulingStrategy();
    +
    +	PreviousAllocationSchedulingStrategy() {}
    --- End diff --
    
    Yes, will add it.


---

[GitHub] flink issue #6208: [FLINK-9634] Disable local recovery scheduling if local r...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6208
  
    Thanks for the review @StefanRRichter and @sihuazhou. I've addressed your comments. Merging this PR once Travis gives green light.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198142541
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * {@link SchedulingStrategy} which tries to match a slot with its previous {@link AllocationID}.
    + * If the previous allocation cannot be found, then it returns {@code null}. If the slot has not
    + * been scheduled before (no assigned allocation id), it will fall back to
    + * {@link LocationPreferenceSchedulingStrategy}.
    + */
    +public class PreviousAllocationSchedulingStrategy extends LocationPreferenceSchedulingStrategy {
    +
    +	private static final PreviousAllocationSchedulingStrategy INSTANCE = new PreviousAllocationSchedulingStrategy();
    +
    +	PreviousAllocationSchedulingStrategy() {}
    --- End diff --
    
    can be `private`


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198129120
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java ---
    @@ -75,10 +82,21 @@ public static DefaultSlotPoolFactory fromConfiguration(
     		final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
     		final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
     
    +		final SchedulingStrategy schedulingStrategy = selectSchedulingStrategy(configuration);
    +
     		return new DefaultSlotPoolFactory(
     			rpcService,
    +			schedulingStrategy,
     			SystemClock.getInstance(),
     			rpcTimeout,
     			slotIdleTimeout);
     	}
    +
    +	private static SchedulingStrategy selectSchedulingStrategy(Configuration configuration) {
    +		if (configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) {
    +			return PreviousAllocationSchedulingStrategy.getInstance();
    +		} else {
    +			return LocationPreferenceSchedulingStrategy.getInstance();
    --- End diff --
    
    Does it make sense to use the `LocationPreferenceSchedulingStrategy` for the init scheduling even when the "local recovery" is enable?


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198121746
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -280,12 +281,7 @@ public JobMaster(
     
     		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
     
    -		this.slotPool = new SlotPool(
    -			rpcService,
    -			jobGraph.getJobID(),
    -			SystemClock.getInstance(),
    -			rpcTimeout,
    -			jobMasterConfiguration.getSlotIdleTimeout());
    +		this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
    --- End diff --
    
    It seems like you could also construct the `SlotPool` outside just pass it directly into the constructor. The factory is only used to deliver the `SlotPool`-object and no reference is kept.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198421252
  
    --- Diff: flink-tests/src/test/resources/log4j-test.properties ---
    @@ -18,7 +18,7 @@
     
     # Set root logger level to OFF to not flood build logs
     # set manually to INFO for debugging purposes
    -log4j.rootLogger=OFF, testlogger
    +log4j.rootLogger=INFO, testlogger
    --- End diff --
    
    Definitely.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198421635
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * Default {@link SchedulingStrategy} which tries to match a slot with its location preferences.
    + */
    +public class LocationPreferenceSchedulingStrategy implements SchedulingStrategy {
    +
    +	private static final LocationPreferenceSchedulingStrategy INSTANCE = new LocationPreferenceSchedulingStrategy();
    +
    +	/**
    +	 * Calculates the candidate's locality score.
    +	 */
    +	private static final BiFunction<Integer, Integer, Integer> LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh;
    +
    +	LocationPreferenceSchedulingStrategy() {}
    --- End diff --
    
    I don't think so, since `PreviousAllocationSchedulingStrategy` extends this class.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198421923
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * Classes that implement this interface provide a method to match objects to somehow represent slot candidates
    + * against the {@link SlotProfile} that produced the matcher object. A matching candidate is transformed into a
    + * desired result. If the matcher does not find a matching candidate, it returns null.
    + */
    +public interface SchedulingStrategy {
    +
    +	/**
    +	 * This method takes the candidate slots, extracts slot contexts from them, filters them by the profile
    +	 * requirements and potentially by additional requirements, and produces a result from a match.
    +	 *
    +	 * @param slotProfile slotProfile for which to find a matching slot
    --- End diff --
    
    Will correct it.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198142470
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.SlotProfile;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * Default {@link SchedulingStrategy} which tries to match a slot with its location preferences.
    + */
    +public class LocationPreferenceSchedulingStrategy implements SchedulingStrategy {
    +
    +	private static final LocationPreferenceSchedulingStrategy INSTANCE = new LocationPreferenceSchedulingStrategy();
    +
    +	/**
    +	 * Calculates the candidate's locality score.
    +	 */
    +	private static final BiFunction<Integer, Integer, Integer> LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh;
    +
    +	LocationPreferenceSchedulingStrategy() {}
    --- End diff --
    
    can be `private`.


---

[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6208#discussion_r198146744
  
    --- Diff: flink-tests/src/test/resources/log4j-test.properties ---
    @@ -18,7 +18,7 @@
     
     # Set root logger level to OFF to not flood build logs
     # set manually to INFO for debugging purposes
    -log4j.rootLogger=OFF, testlogger
    +log4j.rootLogger=INFO, testlogger
    --- End diff --
    
    Revert?


---