You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/02/29 05:50:44 UTC
[flink] 03/03: [FLINK-16276][tests] Introduce a builder and factory
methods to create DefaultScheduler for testing
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9e0465dff51d1a88d92c672cf0c7cd8a3179984a
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Feb 29 13:49:07 2020 +0800
[FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
---
.../flink/runtime/scheduler/DefaultScheduler.java | 2 +-
.../runtime/scheduler/DefaultSchedulerFactory.java | 2 +-
.../runtime/scheduler/DefaultSchedulerTest.java | 41 +---
.../runtime/scheduler/SchedulerTestingUtils.java | 253 +++++++++++++++++++--
4 files changed, 245 insertions(+), 53 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index a425c9e..2049ec8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -93,7 +93,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
private final Set<ExecutionVertexID> verticesWaitingForRestart;
- public DefaultScheduler(
+ DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 0915126..1671b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -107,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
}
- private SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+ static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
switch (scheduleMode) {
case EAGER:
return new EagerSchedulingStrategy.Factory();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3df9801..b4f39e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -22,11 +22,8 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -36,10 +33,8 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -50,8 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
@@ -59,7 +52,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -780,27 +772,18 @@ public class DefaultSchedulerTest extends TestLogger {
final JobGraph jobGraph,
final SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
- return new DefaultScheduler(
- log,
- jobGraph,
- VoidBackPressureStatsTracker.INSTANCE,
- executor,
- configuration,
- scheduledExecutorService,
- taskRestartExecutor,
- ClassLoader.getSystemClassLoader(),
- new StandaloneCheckpointRecoveryFactory(),
- Time.seconds(300),
- VoidBlobWriter.getInstance(),
- UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
- NettyShuffleMaster.INSTANCE,
- NoOpJobMasterPartitionTracker.INSTANCE,
- schedulingStrategyFactory,
- new RestartPipelinedRegionFailoverStrategy.Factory(),
- testRestartBackoffTimeStrategy,
- testExecutionVertexOperations,
- executionVertexVersioner,
- executionSlotAllocatorFactory);
+ return SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+ .setLogger(log)
+ .setIoExecutor(executor)
+ .setJobMasterConfiguration(configuration)
+ .setFutureExecutor(scheduledExecutorService)
+ .setDelayExecutor(taskRestartExecutor)
+ .setSchedulingStrategyFactory(schedulingStrategyFactory)
+ .setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
+ .setExecutionVertexOperations(testExecutionVertexOperations)
+ .setExecutionVertexVersioner(executionVertexVersioner)
+ .setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+ .build();
}
private void startScheduling(final SchedulerNG scheduler) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index d7289b3..2a2c73e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -21,37 +21,52 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -61,6 +76,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -76,8 +93,47 @@ public class SchedulerTestingUtils {
private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
+ private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
private SchedulerTestingUtils() {}
+ public static DefaultSchedulerBuilder newSchedulerBuilder(final JobGraph jobGraph) {
+ return new DefaultSchedulerBuilder(jobGraph);
+ }
+
+ public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
+ final JobGraph jobGraph,
+ final SlotProvider slotProvider) {
+
+ return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, DEFAULT_TIMEOUT);
+ }
+
+ public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
+ final JobGraph jobGraph,
+ final SlotProvider slotProvider,
+ final Time slotRequestTimeout) {
+
+ return new DefaultSchedulerBuilder(jobGraph)
+ .setExecutionSlotAllocatorFactory(
+ createDefaultExecutionSlotAllocatorFactory(jobGraph.getScheduleMode(), slotProvider, slotRequestTimeout));
+ }
+
+ public static DefaultScheduler createScheduler(
+ final JobGraph jobGraph,
+ final SlotProvider slotProvider) throws Exception {
+
+ return createScheduler(jobGraph, slotProvider, DEFAULT_TIMEOUT);
+ }
+
+ public static DefaultScheduler createScheduler(
+ final JobGraph jobGraph,
+ final SlotProvider slotProvider,
+ final Time slotRequestTimeout) throws Exception {
+
+ return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, slotRequestTimeout)
+ .build();
+ }
+
public static DefaultScheduler createScheduler(
JobGraph jobGraph,
ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
@@ -102,27 +158,26 @@ public class SchedulerTestingUtils {
ManuallyTriggeredScheduledExecutorService asyncExecutor,
TaskManagerGateway taskManagerGateway) throws Exception {
- return new DefaultScheduler(
- LOG,
- jobGraph,
- VoidBackPressureStatsTracker.INSTANCE,
- Executors.directExecutor(),
- new Configuration(),
- asyncExecutor,
- asyncExecutor,
- ClassLoader.getSystemClassLoader(),
- new StandaloneCheckpointRecoveryFactory(),
- Time.seconds(300),
- VoidBlobWriter.getInstance(),
- UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
- NettyShuffleMaster.INSTANCE,
- NoOpJobMasterPartitionTracker.INSTANCE,
- new EagerSchedulingStrategy.Factory(),
- new RestartPipelinedRegionFailoverStrategy.Factory(),
- new TestRestartBackoffTimeStrategy(true, 0),
- new DefaultExecutionVertexOperations(),
- new ExecutionVertexVersioner(),
- new TestExecutionSlotAllocatorFactory(taskManagerGateway));
+ return newSchedulerBuilder(jobGraph)
+ .setFutureExecutor(asyncExecutor)
+ .setDelayExecutor(asyncExecutor)
+ .setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
+ .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
+ .setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
+ .build();
+ }
+
+ public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
+ final ScheduleMode scheduleMode,
+ final SlotProvider slotProvider,
+ final Time slotRequestTimeout) {
+
+ final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
+ scheduleMode,
+ slotProvider,
+ slotRequestTimeout);
+
+ return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
}
public static void enableCheckpointing(final JobGraph jobGraph) {
@@ -212,4 +267,158 @@ public class SchedulerTestingUtils {
return operatorGateway.sendOperatorEventToTask(task, operator, evt);
}
}
+
+ /**
+ * Builder for {@link DefaultScheduler}.
+ */
+ public static class DefaultSchedulerBuilder {
+ private final JobGraph jobGraph;
+
+ private SchedulingStrategyFactory schedulingStrategyFactory;
+
+ private Logger log = LOG;
+ private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+ private Executor ioExecutor = TestingUtils.defaultExecutor();
+ private Configuration jobMasterConfiguration = new Configuration();
+ private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+ private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(futureExecutor);
+ private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+ private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+ private Time rpcTimeout = DEFAULT_TIMEOUT;
+ private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+ private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+ private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+ private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+ private FailoverStrategy.Factory failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
+ private RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
+ private ExecutionVertexOperations executionVertexOperations = new DefaultExecutionVertexOperations();
+ private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
+ private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+
+ private DefaultSchedulerBuilder(final JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+
+ // scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
+ this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+ }
+
+ public DefaultSchedulerBuilder setLogger(final Logger log) {
+ this.log = log;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setBackPressureStatsTracker(final BackPressureStatsTracker backPressureStatsTracker) {
+ this.backPressureStatsTracker = backPressureStatsTracker;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setJobMasterConfiguration(final Configuration jobMasterConfiguration) {
+ this.jobMasterConfiguration = jobMasterConfiguration;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+ this.futureExecutor = futureExecutor;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) {
+ this.delayExecutor = delayExecutor;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+ this.userCodeLoader = userCodeLoader;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+ this.blobWriter = blobWriter;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setJobManagerJobMetricGroup(final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+ this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+ this.shuffleMaster = shuffleMaster;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setPartitionTracker(final JobMasterPartitionTracker partitionTracker) {
+ this.partitionTracker = partitionTracker;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setSchedulingStrategyFactory(final SchedulingStrategyFactory schedulingStrategyFactory) {
+ this.schedulingStrategyFactory = schedulingStrategyFactory;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setFailoverStrategyFactory(final FailoverStrategy.Factory failoverStrategyFactory) {
+ this.failoverStrategyFactory = failoverStrategyFactory;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+ this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setExecutionVertexOperations(final ExecutionVertexOperations executionVertexOperations) {
+ this.executionVertexOperations = executionVertexOperations;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setExecutionVertexVersioner(final ExecutionVertexVersioner executionVertexVersioner) {
+ this.executionVertexVersioner = executionVertexVersioner;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
+ this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
+ return this;
+ }
+
+ public DefaultScheduler build() throws Exception {
+ return new DefaultScheduler(
+ log,
+ jobGraph,
+ backPressureStatsTracker,
+ ioExecutor,
+ jobMasterConfiguration,
+ futureExecutor,
+ delayExecutor,
+ userCodeLoader,
+ checkpointRecoveryFactory,
+ rpcTimeout,
+ blobWriter,
+ jobManagerJobMetricGroup,
+ shuffleMaster,
+ partitionTracker,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionVertexOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory);
+ }
+ }
}