You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/11 14:25:27 UTC
[flink] 03/03: [FLINK-17016][runtime] Enable to use pipelined
region scheduling strategy
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4df2295f54709f4292888a3b4fcbb019dd4d7901
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Sep 10 20:31:42 2020 +0800
[FLINK-17016][runtime] Enable to use pipelined region scheduling strategy
It can be enabled via config option "jobmanager.scheduler.scheduling-strategy=region"
---
.../flink/configuration/JobManagerOptions.java | 18 ++
.../flink/runtime/scheduler/DefaultScheduler.java | 51 ++++-
...actory.java => DefaultSchedulerComponents.java} | 157 ++++++----------
.../runtime/scheduler/DefaultSchedulerFactory.java | 110 +----------
.../scheduler/ExecutionSlotAllocationContext.java | 99 ++++------
.../DefaultSchedulerComponentsFactoryTest.java | 77 ++++++++
.../runtime/scheduler/SchedulerTestingUtils.java | 6 +-
.../PipelinedRegionSchedulingITCase.java | 205 +++++++++++++++++++++
8 files changed, 450 insertions(+), 273 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index aa3dc9a..ed8be72 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -326,6 +326,7 @@ public class JobManagerOptions {
// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
+
/**
* Config parameter determining the scheduler implementation.
*/
@@ -339,6 +340,23 @@ public class JobManagerOptions {
.list(
text("'ng': new generation scheduler"))
.build());
+
+ /**
+ * Config parameter determining the scheduling strategy.
+ */
+ @Documentation.ExcludeFromDocumentation("User normally should not be expected to change this config.")
+ public static final ConfigOption<String> SCHEDULING_STRATEGY =
+ key("jobmanager.scheduler.scheduling-strategy")
+ .stringType()
+ .defaultValue("legacy")
+ .withDescription(Description.builder()
+ .text("Determines which scheduling strategy is used to schedule tasks. Accepted values are:")
+ .list(
+ text("'region': pipelined region scheduling"),
+ text("'legacy': legacy scheduling strategy, which is eager scheduling for streaming jobs " +
+ "and lazy from sources scheduling for batch jobs"))
+ .build());
+
/**
* Config parameter controlling whether partitions should already be released during the job execution.
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index db748bd..fa27baa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -38,7 +40,9 @@ import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
@@ -48,8 +52,10 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -161,14 +167,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
restartBackoffTimeStrategy);
this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
- final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
- getPreferredLocationsRetriever(),
- executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
- executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
- getSchedulingTopology(),
- () -> getJobGraph().getSlotSharingGroups(),
- () -> getJobGraph().getCoLocationGroupDescriptors());
- this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
+ this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory)
+ .createInstance(new DefaultExecutionSlotAllocationContext());
this.verticesWaitingForRestart = new HashSet<>();
this.startUpAction = startUpAction;
@@ -513,4 +513,39 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
}
}
+
+ private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
+
+ @Override
+ public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
+ final ExecutionVertexID executionVertexId,
+ final Set<ExecutionVertexID> producersToIgnore) {
+ return getPreferredLocationsRetriever().getPreferredLocations(executionVertexId, producersToIgnore);
+ }
+
+ @Override
+ public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
+ return getExecutionVertex(executionVertexId).getResourceProfile();
+ }
+
+ @Override
+ public AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
+ return getExecutionVertex(executionVertexId).getLatestPriorAllocation();
+ }
+
+ @Override
+ public SchedulingTopology getSchedulingTopology() {
+ return DefaultScheduler.this.getSchedulingTopology();
+ }
+
+ @Override
+ public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
+ return getJobGraph().getSlotSharingGroups();
+ }
+
+ @Override
+ public Set<CoLocationGroupDesc> getCoLocationGroups() {
+ return getJobGraph().getCoLocationGroupDescriptors();
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
similarity index 55%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
index 84f3143..a99325a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
@@ -23,18 +23,10 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
@@ -45,93 +37,80 @@ import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.clock.SystemClock;
-import org.slf4j.Logger;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
/**
- * Factory for {@link DefaultScheduler}.
+ * Components to create a {@link DefaultScheduler} which depends on the
+ * configured {@link JobManagerOptions#SCHEDULING_STRATEGY}.
*/
-public class DefaultSchedulerFactory implements SchedulerNGFactory {
-
- @Override
- public SchedulerNG createInstance(
- final Logger log,
- final JobGraph jobGraph,
- final BackPressureStatsTracker backPressureStatsTracker,
- final Executor ioExecutor,
+public class DefaultSchedulerComponents {
+
+ private static final String PIPELINED_REGION_SCHEDULING = "region";
+ private static final String LEGACY_SCHEDULING = "legacy";
+
+ private final SchedulingStrategyFactory schedulingStrategyFactory;
+ private final Consumer<ComponentMainThreadExecutor> startUpAction;
+ private final ExecutionSlotAllocatorFactory allocatorFactory;
+
+ private DefaultSchedulerComponents(
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ExecutionSlotAllocatorFactory allocatorFactory) {
+
+ this.schedulingStrategyFactory = schedulingStrategyFactory;
+ this.startUpAction = startUpAction;
+ this.allocatorFactory = allocatorFactory;
+ }
+
+ SchedulingStrategyFactory getSchedulingStrategyFactory() {
+ return schedulingStrategyFactory;
+ }
+
+ Consumer<ComponentMainThreadExecutor> getStartUpAction() {
+ return startUpAction;
+ }
+
+ ExecutionSlotAllocatorFactory getAllocatorFactory() {
+ return allocatorFactory;
+ }
+
+ static DefaultSchedulerComponents createSchedulerComponents(
+ final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
- final ScheduledExecutorService futureExecutor,
- final ClassLoader userCodeLoader,
- final CheckpointRecoveryFactory checkpointRecoveryFactory,
- final Time rpcTimeout,
- final BlobWriter blobWriter,
- final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final Time slotRequestTimeout,
- final ShuffleMaster<?> shuffleMaster,
- final JobMasterPartitionTracker partitionTracker,
- final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
-
- final DefaultSchedulerComponents schedulerComponents = createDefaultSchedulerComponents(
- jobGraph.getScheduleMode(),
- jobMasterConfiguration,
- slotPool,
- slotRequestTimeout);
- final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader
- .createRestartBackoffTimeStrategyFactory(
- jobGraph
- .getSerializedExecutionConfig()
- .deserializeValue(userCodeLoader)
- .getRestartStrategy(),
- jobMasterConfiguration,
- jobGraph.isCheckpointingEnabled())
- .create();
- log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
-
- return new DefaultScheduler(
- log,
- jobGraph,
- backPressureStatsTracker,
- ioExecutor,
- jobMasterConfiguration,
- schedulerComponents.startUpAction,
- futureExecutor,
- new ScheduledExecutorServiceAdapter(futureExecutor),
- userCodeLoader,
- checkpointRecoveryFactory,
- rpcTimeout,
- blobWriter,
- jobManagerJobMetricGroup,
- shuffleMaster,
- partitionTracker,
- schedulerComponents.schedulingStrategyFactory,
- FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
- restartBackoffTimeStrategy,
- new DefaultExecutionVertexOperations(),
- new ExecutionVertexVersioner(),
- schedulerComponents.allocatorFactory,
- executionDeploymentTracker);
+ final Time slotRequestTimeout) {
+
+ final String schedulingStrategy = jobMasterConfiguration.getString(JobManagerOptions.SCHEDULING_STRATEGY);
+ switch (schedulingStrategy) {
+ case PIPELINED_REGION_SCHEDULING:
+ return createPipelinedRegionSchedulerComponents(
+ scheduleMode,
+ jobMasterConfiguration,
+ slotPool,
+ slotRequestTimeout);
+ case LEGACY_SCHEDULING:
+ return createLegacySchedulerComponents(
+ scheduleMode,
+ jobMasterConfiguration,
+ slotPool,
+ slotRequestTimeout);
+ default:
+ throw new IllegalStateException("Unsupported scheduling strategy " + schedulingStrategy);
+ }
}
- private static DefaultSchedulerComponents createDefaultSchedulerComponents(
+ private static DefaultSchedulerComponents createLegacySchedulerComponents(
final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {
+
final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
@@ -139,12 +118,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
scheduler,
slotRequestTimeout);
return new DefaultSchedulerComponents(
- createSchedulingStrategyFactory(scheduleMode),
+ createLegacySchedulingStrategyFactory(scheduleMode),
scheduler::start,
new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
}
- static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+ private static SchedulingStrategyFactory createLegacySchedulingStrategyFactory(final ScheduleMode scheduleMode) {
switch (scheduleMode) {
case EAGER:
return new EagerSchedulingStrategy.Factory();
@@ -161,6 +140,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {
+
final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
.createFromSlotPool(slotPool, SystemClock.getInstance());
@@ -176,7 +156,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
allocatorFactory);
}
- private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
+ private static SlotSelectionStrategy selectSlotSelectionStrategy(final Configuration configuration) {
final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
@@ -189,19 +169,4 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
locationPreferenceSlotSelectionStrategy;
}
-
- private static class DefaultSchedulerComponents {
- private final SchedulingStrategyFactory schedulingStrategyFactory;
- private final Consumer<ComponentMainThreadExecutor> startUpAction;
- private final ExecutionSlotAllocatorFactory allocatorFactory;
-
- private DefaultSchedulerComponents(
- final SchedulingStrategyFactory schedulingStrategyFactory,
- final Consumer<ComponentMainThreadExecutor> startUpAction,
- final ExecutionSlotAllocatorFactory allocatorFactory) {
- this.schedulingStrategyFactory = schedulingStrategyFactory;
- this.startUpAction = startUpAction;
- this.allocatorFactory = allocatorFactory;
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 84f3143..61b7620 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -20,47 +20,27 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
-import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
-import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
-import javax.annotation.Nonnull;
-
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
+
+import static org.apache.flink.runtime.scheduler.DefaultSchedulerComponents.createSchedulerComponents;
/**
* Factory for {@link DefaultScheduler}.
@@ -86,7 +66,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
final JobMasterPartitionTracker partitionTracker,
final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
- final DefaultSchedulerComponents schedulerComponents = createDefaultSchedulerComponents(
+ final DefaultSchedulerComponents schedulerComponents = createSchedulerComponents(
jobGraph.getScheduleMode(),
jobMasterConfiguration,
slotPool,
@@ -108,7 +88,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
- schedulerComponents.startUpAction,
+ schedulerComponents.getStartUpAction(),
futureExecutor,
new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
@@ -118,90 +98,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
jobManagerJobMetricGroup,
shuffleMaster,
partitionTracker,
- schedulerComponents.schedulingStrategyFactory,
+ schedulerComponents.getSchedulingStrategyFactory(),
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
new DefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
- schedulerComponents.allocatorFactory,
+ schedulerComponents.getAllocatorFactory(),
executionDeploymentTracker);
}
-
- private static DefaultSchedulerComponents createDefaultSchedulerComponents(
- final ScheduleMode scheduleMode,
- final Configuration jobMasterConfiguration,
- final SlotPool slotPool,
- final Time slotRequestTimeout) {
- final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
- final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
- final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
- scheduleMode,
- scheduler,
- slotRequestTimeout);
- return new DefaultSchedulerComponents(
- createSchedulingStrategyFactory(scheduleMode),
- scheduler::start,
- new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
- }
-
- static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
- switch (scheduleMode) {
- case EAGER:
- return new EagerSchedulingStrategy.Factory();
- case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
- case LAZY_FROM_SOURCES:
- return new LazyFromSourcesSchedulingStrategy.Factory();
- default:
- throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
- }
- }
-
- private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents(
- final ScheduleMode scheduleMode,
- final Configuration jobMasterConfiguration,
- final SlotPool slotPool,
- final Time slotRequestTimeout) {
- final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
- final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
- .createFromSlotPool(slotPool, SystemClock.getInstance());
- final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
- final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
- physicalSlotProvider,
- scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
- bulkChecker,
- slotRequestTimeout);
- return new DefaultSchedulerComponents(
- new PipelinedRegionSchedulingStrategy.Factory(),
- bulkChecker::start,
- allocatorFactory);
- }
-
- private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
- final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
-
- final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
-
- locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ?
- LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() :
- LocationPreferenceSlotSelectionStrategy.createDefault();
-
- return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ?
- PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
- locationPreferenceSlotSelectionStrategy;
- }
-
- private static class DefaultSchedulerComponents {
- private final SchedulingStrategyFactory schedulingStrategyFactory;
- private final Consumer<ComponentMainThreadExecutor> startUpAction;
- private final ExecutionSlotAllocatorFactory allocatorFactory;
-
- private DefaultSchedulerComponents(
- final SchedulingStrategyFactory schedulingStrategyFactory,
- final Consumer<ComponentMainThreadExecutor> startUpAction,
- final ExecutionSlotAllocatorFactory allocatorFactory) {
- this.schedulingStrategyFactory = schedulingStrategyFactory;
- this.startUpAction = startUpAction;
- this.allocatorFactory = allocatorFactory;
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
index 8180c1f..268ce89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -25,71 +25,48 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import java.util.Collection;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
/**
* Context for slot allocation.
*/
-class ExecutionSlotAllocationContext implements PreferredLocationsRetriever {
-
- private final PreferredLocationsRetriever preferredLocationsRetriever;
-
- private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
- private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
-
- private final SchedulingTopology schedulingTopology;
-
- private final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier;
-
- private final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier;
-
- ExecutionSlotAllocationContext(
- final PreferredLocationsRetriever preferredLocationsRetriever,
- final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
- final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
- final SchedulingTopology schedulingTopology,
- final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier,
- final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier) {
-
- this.preferredLocationsRetriever = preferredLocationsRetriever;
- this.resourceProfileRetriever = resourceProfileRetriever;
- this.priorAllocationIdRetriever = priorAllocationIdRetriever;
- this.schedulingTopology = schedulingTopology;
- this.logicalSlotSharingGroupSupplier = logicalSlotSharingGroupSupplier;
- this.coLocationGroupSupplier = coLocationGroupSupplier;
- }
-
- @Override
- public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
- final ExecutionVertexID executionVertexId,
- final Set<ExecutionVertexID> producersToIgnore) {
- return preferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
- }
-
- ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
- return resourceProfileRetriever.apply(executionVertexId);
- }
-
- AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
- return priorAllocationIdRetriever.apply(executionVertexId);
- }
-
- SchedulingTopology getSchedulingTopology() {
- return schedulingTopology;
- }
-
- Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
- return logicalSlotSharingGroupSupplier.get();
- }
-
- Set<CoLocationGroupDesc> getCoLocationGroups() {
- return coLocationGroupSupplier.get();
- }
+interface ExecutionSlotAllocationContext extends PreferredLocationsRetriever {
+
+ /**
+ * Returns required resources for an execution vertex.
+ *
+ * @param executionVertexId id of the execution vertex
+ * @return required resources for the given execution vertex
+ */
+ ResourceProfile getResourceProfile(ExecutionVertexID executionVertexId);
+
+ /**
+ * Returns prior allocation id for an execution vertex.
+ *
+ * @param executionVertexId id of the execution vertex
+ * @return prior allocation id for the given execution vertex
+ */
+ AllocationID getPriorAllocationId(ExecutionVertexID executionVertexId);
+
+ /**
+ * Returns the scheduling topology containing all execution vertices and edges.
+ *
+ * @return scheduling topology
+ */
+ SchedulingTopology getSchedulingTopology();
+
+ /**
+ * Returns all slot sharing groups in the job.
+ *
+ * @return all slot sharing groups in the job
+ */
+ Set<SlotSharingGroup> getLogicalSlotSharingGroups();
+
+ /**
+ * Returns all co-location groups in the job.
+ *
+ * @return all co-location groups in the job
+ */
+ Set<CoLocationGroupDesc> getCoLocationGroups();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
new file mode 100644
index 0000000..572ab3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
+import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the factory method {@link DefaultSchedulerComponents#createSchedulerComponents(
+ * ScheduleMode, Configuration, SlotPool, Time)}.
+ */
+public class DefaultSchedulerComponentsFactoryTest extends TestLogger {
+
+ @Test
+ public void testCreatingPipelinedSchedulingStrategyFactory() {
+ final Configuration configuration = new Configuration();
+ configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");
+
+ final DefaultSchedulerComponents components = createSchedulerComponents(configuration);
+ assertThat(components.getSchedulingStrategyFactory(), instanceOf(PipelinedRegionSchedulingStrategy.Factory.class));
+ assertThat(components.getAllocatorFactory(), instanceOf(SlotSharingExecutionSlotAllocatorFactory.class));
+ }
+
+ @Test
+ public void testCreatingLegacySchedulingStrategyFactory() {
+ final Configuration configuration = new Configuration();
+ configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");
+
+ final DefaultSchedulerComponents components = createSchedulerComponents(configuration);
+ assertThat(components.getSchedulingStrategyFactory(), instanceOf(LazyFromSourcesSchedulingStrategy.Factory.class));
+ assertThat(components.getAllocatorFactory(), instanceOf(DefaultExecutionSlotAllocatorFactory.class));
+ }
+
+ @Test
+ public void testCreatingLegacySchedulingStrategyFactoryByDefault() {
+ final DefaultSchedulerComponents components = createSchedulerComponents(new Configuration());
+ assertThat(components.getSchedulingStrategyFactory(), instanceOf(LazyFromSourcesSchedulingStrategy.Factory.class));
+ }
+
+ private static DefaultSchedulerComponents createSchedulerComponents(final Configuration configuration) {
+ return DefaultSchedulerComponents.createSchedulerComponents(
+ ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ configuration,
+ new TestingSlotPoolImpl(new JobID()),
+ Time.milliseconds(10L));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ec88777..c92b729 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -368,7 +369,7 @@ public class SchedulerTestingUtils {
public static class DefaultSchedulerBuilder {
private final JobGraph jobGraph;
- private SchedulingStrategyFactory schedulingStrategyFactory;
+ private SchedulingStrategyFactory schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory();
private Logger log = LOG;
private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
@@ -391,9 +392,6 @@ public class SchedulerTestingUtils {
private DefaultSchedulerBuilder(final JobGraph jobGraph) {
this.jobGraph = jobGraph;
-
- // scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
- this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
}
public DefaultSchedulerBuilder setLogger(final Logger log) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
new file mode 100644
index 0000000..df2c1ec
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for pipelined region scheduling.
+ */
+public class PipelinedRegionSchedulingITCase extends TestLogger {
+
+ @Test
+ public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() throws Exception {
+ final JobResult jobResult = executeSchedulingTest(2);
+ assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+ }
+
+ @Test
+ public void testFailsOnInsufficientSlots() throws Exception {
+ final JobResult jobResult = executeSchedulingTest(1);
+ assertThat(jobResult.getSerializedThrowable().isPresent(), is(true));
+
+ final Throwable jobFailure = jobResult
+ .getSerializedThrowable()
+ .get()
+ .deserializeError(ClassLoader.getSystemClassLoader());
+
+ final Optional<NoResourceAvailableException> cause = ExceptionUtils.findThrowable(
+ jobFailure,
+ NoResourceAvailableException.class);
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get().getMessage(), containsString("Slot request bulk is not fulfillable!"));
+ }
+
+ private JobResult executeSchedulingTest(int numSlots) throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.setString(RestOptions.BIND_PORT, "0");
+ configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
+ configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");
+
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(numSlots)
+ .build();
+
+ try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
+ miniCluster.start();
+
+ final MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
+
+ final JobGraph jobGraph = createJobGraph(10);
+
+ // wait for the submission to succeed
+ final JobID jobID = miniClusterClient.submitJob(jobGraph).get();
+
+ final CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobID);
+
+ final JobResult jobResult = resultFuture.get();
+
+ return jobResult;
+ }
+ }
+
+ private JobGraph createJobGraph(final int parallelism) {
+ final SlotSharingGroup group1 = new SlotSharingGroup();
+ final JobVertex source1 = new JobVertex("source1");
+ source1.setInvokableClass(PipelinedSender.class);
+ source1.setParallelism(parallelism * 2);
+ source1.setSlotSharingGroup(group1);
+
+ final SlotSharingGroup group2 = new SlotSharingGroup();
+ final JobVertex source2 = new JobVertex("source2");
+ source2.setInvokableClass(NoOpInvokable.class);
+ source2.setParallelism(parallelism);
+ source2.setSlotSharingGroup(group2);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(Receiver.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(group1);
+
+ sink.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ sink.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+ final JobGraph jobGraph = new JobGraph(source1, source2, sink);
+
+ jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
+
+ return jobGraph;
+ }
+
+ /**
+ * This invokable is used by source1. It sends data to trigger the scheduling
+ * of the sink task. It will also wait for a bit time before finishing itself,
+ * so that the scheduled sink task can directly use its slot.
+ */
+ public static class PipelinedSender extends AbstractInvokable {
+
+ public PipelinedSender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (getEnvironment().getAllWriters().length < 1) {
+ throw new IllegalStateException();
+ }
+
+ final RecordWriter<IntValue> writer = new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
+
+ try {
+ writer.emit(new IntValue(42));
+ writer.flushAll();
+ } finally {
+ writer.clearBuffers();
+ }
+
+ if (getIndexInSubtaskGroup() == 0) {
+ Thread.sleep(2000);
+ }
+ }
+ }
+
+ /**
+ * This invokable finishes only after all its upstream task finishes.
+ * Unexpected result partition errors can happen if a task finished
+ * later than its consumer task.
+ */
+ public static class Receiver extends AbstractInvokable {
+
+ public Receiver(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (getEnvironment().getAllInputGates().length < 2) {
+ throw new IllegalStateException();
+ }
+
+ final String[] tmpDirs = getEnvironment().getTaskManagerInfo().getTmpDirectories();
+ final List<RecordReader<IntValue>> readers = Arrays.asList(getEnvironment().getAllInputGates())
+ .stream()
+ .map(inputGate -> new RecordReader<>(inputGate, IntValue.class, tmpDirs))
+ .collect(Collectors.toList());
+
+ for (RecordReader<IntValue> reader : readers) {
+ while (reader.hasNext()) {
+ reader.next();
+ }
+ }
+ }
+ }
+}