You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/13 21:22:24 UTC
[5/5] aurora git commit: Remove the internal SQL database
Remove the internal SQL database
Reviewed at https://reviews.apache.org/r/63743/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/94276046
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/94276046
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/94276046
Branch: refs/heads/master
Commit: 94276046606da4e1491ee3d0e0c29cd3649a82e6
Parents: e0624b2
Author: Bill Farner <wf...@apache.org>
Authored: Mon Nov 13 13:19:28 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Nov 13 13:19:28 2017 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 5 +
.../thrift/org/apache/aurora/gen/storage.thrift | 2 +-
build.gradle | 6 -
.../aurora/benchmark/SchedulingBenchmarks.java | 74 ++-
.../aurora/benchmark/TaskStoreBenchmarks.java | 26 -
.../aurora/scheduler/async/AsyncModule.java | 25 +-
.../aurora/scheduler/async/DelayExecutor.java | 33 -
.../aurora/scheduler/async/GatedWorkQueue.java | 41 --
.../scheduler/async/GatingDelayExecutor.java | 98 ---
.../aurora/scheduler/http/H2ConsoleModule.java | 63 --
.../http/api/security/HttpSecurityModule.java | 11 +-
.../aurora/scheduler/offers/Deferment.java | 10 +-
.../scheduler/pruning/TaskHistoryPruner.java | 12 +-
.../scheduler/reconciliation/KillRetry.java | 11 +-
.../scheduler/reconciliation/TaskTimeout.java | 14 +-
.../aurora/scheduler/scheduling/TaskGroups.java | 9 +-
.../scheduler/scheduling/TaskThrottler.java | 15 +-
.../scheduler/storage/db/AttributeMapper.java | 83 ---
.../scheduler/storage/db/CronJobMapper.java | 40 --
.../scheduler/storage/db/DbAttributeStore.java | 95 ---
.../scheduler/storage/db/DbCronJobStore.java | 84 ---
.../scheduler/storage/db/DbJobUpdateStore.java | 268 ---------
.../scheduler/storage/db/DbLockStore.java | 81 ---
.../aurora/scheduler/storage/db/DbModule.java | 364 -----------
.../scheduler/storage/db/DbQuotaStore.java | 82 ---
.../scheduler/storage/db/DbSchedulerStore.java | 48 --
.../aurora/scheduler/storage/db/DbStorage.java | 242 --------
.../scheduler/storage/db/DbTaskStore.java | 207 -------
.../aurora/scheduler/storage/db/DbUtil.java | 76 ---
.../scheduler/storage/db/EnumBackfill.java | 75 ---
.../scheduler/storage/db/EnumValueMapper.java | 31 -
.../scheduler/storage/db/FrameworkIdMapper.java | 26 -
.../storage/db/GarbageCollectedTableMapper.java | 33 -
.../scheduler/storage/db/InsertResult.java | 36 --
.../storage/db/InstrumentingInterceptor.java | 139 -----
.../db/JobInstanceUpdateEventMapper.java | 34 --
.../scheduler/storage/db/JobKeyMapper.java | 36 --
.../storage/db/JobUpdateDetailsMapper.java | 210 -------
.../storage/db/JobUpdateEventMapper.java | 34 --
.../scheduler/storage/db/LockKeyMapper.java | 49 --
.../aurora/scheduler/storage/db/LockMapper.java | 53 --
.../scheduler/storage/db/MigrationManager.java | 29 -
.../storage/db/MigrationManagerImpl.java | 134 -----
.../scheduler/storage/db/MigrationMapper.java | 51 --
.../scheduler/storage/db/MyBatisCacheImpl.java | 119 ----
.../scheduler/storage/db/PruneVictim.java | 40 --
.../scheduler/storage/db/QuotaMapper.java | 79 ---
.../storage/db/RowGarbageCollector.java | 99 ---
.../scheduler/storage/db/TaskConfigManager.java | 161 -----
.../scheduler/storage/db/TaskConfigMapper.java | 210 -------
.../aurora/scheduler/storage/db/TaskMapper.java | 99 ---
.../migration/V001_CreateAppcImagesTable.java | 46 --
.../migration/V002_CreateDockerImagesTable.java | 46 --
.../V003_CreateResourceTypesTable.java | 56 --
.../migration/V004_CreateTaskResourceTable.java | 74 ---
.../V005_CreateQuotaResourceTable.java | 68 ---
.../db/migration/V006_PopulateTierField.java | 51 --
.../V007_CreateMesosFetcherURIsTable.java | 46 --
.../V008_CreateUpdateMetadataTable.java | 45 --
.../V009_CreateContainerVolumesTable.java | 53 --
.../migration/V010_RemoveUniqueConstraint.java | 41 --
.../typehandlers/AbstractTEnumTypeHandler.java | 70 ---
.../CronCollisionPolicyTypeHandler.java | 26 -
.../JobUpdateActionTypeHandler.java | 26 -
.../JobUpdateStatusTypeHandler.java | 26 -
.../MaintenanceModeTypeHandler.java | 26 -
.../db/typehandlers/ResourceTypeHandler.java | 26 -
.../typehandlers/ScheduleStatusTypeHandler.java | 26 -
.../storage/db/typehandlers/TypeHandlers.java | 41 --
.../db/typehandlers/VolumeModeTypeHandler.java | 23 -
.../scheduler/storage/db/views/DBResource.java | 32 -
.../storage/db/views/DBResourceAggregate.java | 56 --
.../scheduler/storage/db/views/DBSaveQuota.java | 29 -
.../storage/db/views/DbAssginedPort.java | 30 -
.../storage/db/views/DbAssignedTask.java | 48 --
.../storage/db/views/DbConstraint.java | 30 -
.../scheduler/storage/db/views/DbContainer.java | 38 --
.../scheduler/storage/db/views/DbImage.java | 38 --
.../storage/db/views/DbInstanceTaskConfig.java | 33 -
.../storage/db/views/DbJobConfiguration.java | 43 --
.../scheduler/storage/db/views/DbJobUpdate.java | 36 --
.../storage/db/views/DbJobUpdateDetails.java | 33 -
.../db/views/DbJobUpdateInstructions.java | 45 --
.../storage/db/views/DbScheduledTask.java | 54 --
.../db/views/DbStoredJobUpdateDetails.java | 30 -
.../storage/db/views/DbTaskConfig.java | 101 ----
.../storage/db/views/DbTaskConstraint.java | 45 --
.../scheduler/storage/db/views/LockRow.java | 46 --
.../db/views/MigrationChangelogEntry.java | 48 --
.../scheduler/storage/db/views/Pairs.java | 38 --
.../storage/log/SnapshotStoreImpl.java | 86 ---
.../scheduler/storage/db/AttributeMapper.xml | 90 ---
.../scheduler/storage/db/CronJobMapper.xml | 109 ----
.../scheduler/storage/db/EnumValueMapper.xml | 15 -
.../scheduler/storage/db/FrameworkIdMapper.xml | 32 -
.../storage/db/JobInstanceUpdateEventMapper.xml | 33 -
.../scheduler/storage/db/JobKeyMapper.xml | 47 --
.../storage/db/JobUpdateDetailsMapper.xml | 598 -------------------
.../storage/db/JobUpdateEventMapper.xml | 35 --
.../aurora/scheduler/storage/db/LockMapper.xml | 83 ---
.../scheduler/storage/db/MigrationMapper.xml | 55 --
.../aurora/scheduler/storage/db/QuotaMapper.xml | 91 ---
.../scheduler/storage/db/TaskConfigMapper.xml | 460 --------------
.../aurora/scheduler/storage/db/TaskMapper.xml | 241 --------
.../aurora/scheduler/storage/db/schema.sql | 392 ------------
.../aurora/scheduler/async/AsyncModuleTest.java | 3 +-
.../async/GatingDelayExecutorTest.java | 151 -----
.../scheduler/http/H2ConsoleModuleIT.java | 40 --
.../http/api/security/HttpSecurityIT.java | 48 --
.../scheduler/offers/OfferManagerImplTest.java | 6 +-
.../pruning/TaskHistoryPrunerTest.java | 15 +-
.../scheduler/reconciliation/KillRetryTest.java | 9 +-
.../reconciliation/TaskTimeoutTest.java | 13 +-
.../scheduler/scheduling/TaskGroupsTest.java | 6 +-
.../scheduler/scheduling/TaskThrottlerTest.java | 14 +-
.../scheduler/storage/backup/RecoveryTest.java | 2 -
.../storage/db/AttributeStoreTest.java | 24 -
.../scheduler/storage/db/CronJobStoreTest.java | 39 --
.../scheduler/storage/db/DbStorageTest.java | 117 ----
.../db/InstrumentingInterceptorTest.java | 162 -----
.../storage/db/JobUpdateStoreTest.java | 25 -
.../scheduler/storage/db/LockStoreTest.java | 24 -
.../storage/db/MigrationManagerImplIT.java | 157 -----
.../storage/db/MyBatisCacheImplTest.java | 52 --
.../scheduler/storage/db/QuotaStoreTest.java | 24 -
.../storage/db/RowGarbageCollectorTest.java | 113 ----
.../storage/db/SchedulerStoreTest.java | 24 -
.../scheduler/storage/db/TaskStoreTest.java | 39 --
.../db/testmigration/V001_TestMigration.java | 40 --
.../db/testmigration/V002_TestMigration2.java | 40 --
.../storage/log/SnapshotStoreImplIT.java | 93 +--
.../storage/mem/MemCronJobStoreTest.java | 10 -
.../scheduler/storage/mem/MemTaskStoreTest.java | 10 -
.../testing/FakeScheduledExecutor.java | 15 +-
134 files changed, 162 insertions(+), 9091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 92f9c98..d653b79 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -5,6 +5,11 @@
- Updated to Mesos 1.4.0.
+### Deprecations and removals:
+
+- Removed the ability to recover from SQL-based backups and snapshots. An 0.20.0 scheduler
+ will not be able to recover backups or replicated log data created prior to 0.19.0.
+
0.19.0
======
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index ccb5825..74983ba 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -144,7 +144,7 @@ struct Snapshot {
8: set<QuotaConfiguration> quotaConfigurations
9: set<api.Lock> locks
10: set<StoredJobUpdateDetails> jobUpdateDetails
- 11: list<string> dbScript
+ //11: removed
//12: removed
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index aa416b7..af11991 100644
--- a/build.gradle
+++ b/build.gradle
@@ -87,7 +87,6 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
ext.jerseyRev = '1.19'
ext.junitRev = '4.12'
ext.logbackRev = '1.2.3'
- ext.mybatisRev = '3.4.1'
ext.nettyRev = '4.0.52.Final'
ext.protobufRev = '3.3.0'
ext.servletRev = '3.1.0'
@@ -120,7 +119,6 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
force "org.apache.zookeeper:zookeeper:${zookeeperRev}"
force "org.hamcrest:hamcrest-core:1.3"
force "org.slf4j:slf4j-api:${slf4jRev}"
- force "org.mybatis:mybatis:${mybatisRev}"
}
}
}
@@ -373,7 +371,6 @@ dependencies {
compile "com.google.inject:guice:${guiceRev}"
compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}"
compile "com.google.protobuf:protobuf-java:${protobufRev}"
- compile 'com.h2database:h2:1.4.196'
compile 'com.hubspot.jackson:jackson-datatype-protobuf:0.9.3'
compile "com.fasterxml.jackson.core:jackson-core:${jacksonRev}"
compile "com.sun.jersey:jersey-core:${jerseyRev}"
@@ -396,9 +393,6 @@ dependencies {
compile "org.eclipse.jetty:jetty-server:${jettyDep}"
compile "org.eclipse.jetty:jetty-servlet:${jettyDep}"
compile "org.eclipse.jetty:jetty-servlets:${jettyDep}"
- compile "org.mybatis:mybatis:${mybatisRev}"
- compile 'org.mybatis:mybatis-guice:3.7'
- compile 'org.mybatis:mybatis-migrations:3.2.0'
compile 'org.quartz-scheduler:quartz:2.2.2'
testCompile "com.sun.jersey:jersey-client:${jerseyRev}"
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 292bb29..1708a50 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -15,6 +15,10 @@ package org.apache.aurora.benchmark;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
@@ -46,7 +50,6 @@ import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.async.AsyncModule;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.config.CommandLine;
@@ -138,20 +141,11 @@ public class SchedulingBenchmarks {
new PrivateModule() {
@Override
protected void configure() {
+
// We use a no-op executor for async work, as this benchmark is focused on the
// synchronous scheduling operations.
- bind(DelayExecutor.class).annotatedWith(AsyncModule.AsyncExecutor.class)
- .toInstance(new DelayExecutor() {
- @Override
- public void execute(Runnable work, Amount<Long, Time> minDelay) {
- // No-op.
- }
-
- @Override
- public void execute(Runnable command) {
- // No-op.
- }
- });
+ bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class)
+ .toInstance(new NoopExecutor());
bind(Deferment.class).to(Deferment.Noop.class);
bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
@@ -400,4 +394,58 @@ public class SchedulingBenchmarks {
return ImmutableSet.of("" + System.currentTimeMillis());
}
}
+
+ private static class NoopExecutor extends AbstractExecutorService
+ implements ScheduledExecutorService {
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return false;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
index 6f2f9f4..9ec9865 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
@@ -27,7 +27,6 @@ import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -111,29 +110,4 @@ public class TaskStoreBenchmarks {
storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped())));
}
}
-
- public static class DBFetchTasksBenchmark extends AbstractFetchTasksBenchmark {
- @Setup(Level.Trial)
- @Override
- public void setUp() {
- storage = DbUtil.createStorage();
- }
-
- @Setup(Level.Iteration)
- public void setUpIteration() {
- createTasks(numTasks);
- }
-
- @TearDown(Level.Iteration)
- public void tearDownIteration() {
- deleteTasks();
- }
-
- @Benchmark
- public int run() {
- // Iterate through results in case the result is lazily computed.
- return Iterables.size(
- storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped())));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 68f7ddb..0166d41 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -81,20 +81,15 @@ public class AsyncModule extends AbstractModule {
@Override
protected void configure() {
bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction);
- bind(ScheduledExecutorService.class).toInstance(afterTransaction);
-
- bind(GatingDelayExecutor.class).in(Singleton.class);
- expose(GatingDelayExecutor.class);
-
bind(RegisterGauges.class).in(Singleton.class);
expose(RegisterGauges.class);
}
});
SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
- bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
- bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
- bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
+ bind(Executor.class).annotatedWith(AsyncExecutor.class).toInstance(afterTransaction);
+ bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class)
+ .toInstance(afterTransaction);
}
static class RegisterGauges extends AbstractIdleService {
@@ -104,31 +99,19 @@ public class AsyncModule extends AbstractModule {
@VisibleForTesting
static final String ASYNC_TASKS_GAUGE = "async_tasks_completed";
- @VisibleForTesting
- static final String DELAY_QUEUE_GAUGE = "delay_executor_queue_size";
-
private final StatsProvider statsProvider;
private final ScheduledThreadPoolExecutor executor;
- private final GatingDelayExecutor delayExecutor;
@Inject
- RegisterGauges(
- StatsProvider statsProvider,
- ScheduledThreadPoolExecutor executor,
- GatingDelayExecutor delayExecutor) {
-
+ RegisterGauges(StatsProvider statsProvider, ScheduledThreadPoolExecutor executor) {
this.statsProvider = requireNonNull(statsProvider);
this.executor = requireNonNull(executor);
- this.delayExecutor = requireNonNull(delayExecutor);
}
@Override
protected void startUp() {
statsProvider.makeGauge(TIMEOUT_QUEUE_GAUGE, () -> executor.getQueue().size());
statsProvider.makeGauge(ASYNC_TASKS_GAUGE, executor::getCompletedTaskCount);
- // Using a lambda rather than method ref to sidestep a bug in PMD that makes it think
- // delayExecutor is unused.
- statsProvider.makeGauge(DELAY_QUEUE_GAUGE, delayExecutor::getQueueSize);
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java
deleted file mode 100644
index c851e5b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.Executor;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-/**
- * An executor that supports executing work after a minimum time delay.
- */
-public interface DelayExecutor extends Executor {
-
- /**
- * Executes {@code work} after no less than {@code minDelay}.
- *
- * @param work Work to execute.
- * @param minDelay Minimum amount of time to wait before executing the work.
- */
- void execute(Runnable work, Amount<Long, Time> minDelay);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
deleted file mode 100644
index 7032271..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-/**
- * A work queue that only executes pending work when flushed.
- */
-public interface GatedWorkQueue {
-
- /**
- * Closes the gate on the work queue for the duration of an operation.
- *
- * @param operation Operation to execute while keeping the gate closed.
- * @param <T> Operation return type.
- * @param <E> Operation exception type.
- * @return The value returned by the {@code operation}.
- * @throws E Exception thrown by the {@code operation}.
- */
- <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E;
-
- /**
- * Operation prevents new items from being executed on the work queue.
- *
- * @param <T> Operation return type.
- * @param <E> Operation exception type.
- */
- interface GatedOperation<T, E extends Exception> {
- T doWithGateClosed() throws E;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
deleted file mode 100644
index a7240ae..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Queue;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * An executor that may be temporarily gated with {@link #closeDuring(GatedOperation)}. When the
- * executor is gated, newly-submitted work will be enqueued and executed once the gate is opened as
- * a result of {@link #closeDuring(GatedOperation)} returning.
- */
-class GatingDelayExecutor implements DelayExecutor, GatedWorkQueue {
-
- private final ScheduledExecutorService gated;
- private final Queue<Runnable> queue = Lists.newLinkedList();
-
- /**
- * Creates a gating delay executor that will gate work from the provided executor.
- *
- * @param gated Delegate to execute work with when ungated.
- */
- @Inject
- GatingDelayExecutor(ScheduledExecutorService gated) {
- this.gated = requireNonNull(gated);
- }
-
- private final ThreadLocal<Boolean> isOpen = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return true;
- }
- };
-
- @Override
- public <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E {
- boolean startedOpen = isOpen.get();
- isOpen.set(false);
-
- try {
- return operation.doWithGateClosed();
- } finally {
- if (startedOpen) {
- isOpen.set(true);
- flush();
- }
- }
- }
-
- synchronized int getQueueSize() {
- return queue.size();
- }
-
- private synchronized void enqueue(Runnable work) {
- if (isOpen.get()) {
- work.run();
- } else {
- queue.add(work);
- }
- }
-
- private synchronized void flush() {
- for (Runnable work : Iterables.consumingIterable(queue)) {
- work.run();
- }
- }
-
- @Override
- public synchronized void execute(Runnable command) {
- enqueue(() -> gated.execute(command));
- }
-
- @Override
- public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) {
- enqueue(() -> gated.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit()));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java b/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java
deleted file mode 100644
index ee10f47..0000000
--- a/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.http;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.servlet.ServletModule;
-
-import org.h2.server.web.WebServlet;
-
-/**
- * Binding module for the H2 management console.
- * <p>
- * See: http://www.h2database.com/html/tutorial.html#tutorial_starting_h2_console
- */
-public class H2ConsoleModule extends ServletModule {
- public static final String H2_PATH = "/h2console";
- public static final String H2_PERM = "h2_management_console";
-
- @Parameters(separators = "=")
- public static class Options {
- @Parameter(
- names = "-enable_h2_console",
- description = "Enable H2 DB management console.",
- arity = 1)
- public boolean enableH2Console = false;
- }
-
- private final boolean enabled;
-
- public H2ConsoleModule(Options options) {
- this(options.enableH2Console);
- }
-
- @VisibleForTesting
- public H2ConsoleModule(boolean enabled) {
- this.enabled = enabled;
- }
-
- @Override
- protected void configureServlets() {
- if (enabled) {
- filter(H2_PATH, H2_PATH + "/*").through(LeaderRedirectFilter.class);
- serve(H2_PATH, H2_PATH + "/*").with(new WebServlet(), ImmutableMap.of(
- "webAllowOthers", "true",
- "ifExists", "true"
- ));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java
index 5229450..d81671c 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java
@@ -54,8 +54,6 @@ import org.apache.shiro.subject.Subject;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PATH;
-import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PERM;
import static org.apache.aurora.scheduler.http.api.ApiModule.API_PATH;
import static org.apache.aurora.scheduler.spi.Permissions.Domain.THRIFT_AURORA_ADMIN;
import static org.apache.shiro.guice.web.ShiroWebModule.guiceFilterModule;
@@ -68,12 +66,9 @@ import static org.apache.shiro.web.filter.authc.AuthenticatingFilter.PERMISSIVE;
* included with this package.
*/
public class HttpSecurityModule extends ServletModule {
- public static final String HTTP_REALM_NAME = "Apache Aurora Scheduler";
+ private static final String HTTP_REALM_NAME = "Apache Aurora Scheduler";
- private static final String H2_PATTERN = H2_PATH + "/**";
private static final String ALL_PATTERN = "/**";
- private static final Key<? extends Filter> K_STRICT =
- Key.get(ShiroKerberosAuthenticationFilter.class);
private static final Key<? extends Filter> K_PERMISSIVE =
Key.get(ShiroKerberosPermissiveAuthenticationFilter.class);
@@ -176,8 +171,6 @@ public class HttpSecurityModule extends ServletModule {
}
});
install(guiceFilterModule(API_PATH));
- install(guiceFilterModule(H2_PATH));
- install(guiceFilterModule(H2_PATH + "/*"));
install(new ShiroWebModule(getServletContext()) {
// Replace the ServletContainerSessionManager which causes subject.runAs(...) in a
@@ -200,12 +193,10 @@ public class HttpSecurityModule extends ServletModule {
// more specific pattern first.
switch (mechanism) {
case BASIC:
- addFilterChain(H2_PATTERN, NO_SESSION_CREATION, AUTHC_BASIC, config(PERMS, H2_PERM));
addFilterChainWithAfterAuthFilter(config(AUTHC_BASIC, PERMISSIVE));
break;
case NEGOTIATE:
- addFilterChain(H2_PATTERN, NO_SESSION_CREATION, K_STRICT, config(PERMS, H2_PERM));
addFilterChainWithAfterAuthFilter(K_PERMISSIVE);
break;
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
index f3ec886..90a4428 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
@@ -13,6 +13,8 @@
*/
package org.apache.aurora.scheduler.offers;
+import java.util.concurrent.ScheduledExecutorService;
+
import javax.inject.Inject;
import com.google.common.base.Supplier;
@@ -20,7 +22,6 @@ import com.google.common.base.Supplier;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import static java.util.Objects.requireNonNull;
@@ -51,12 +52,12 @@ public interface Deferment {
*/
class DelayedDeferment implements Deferment {
private final Supplier<Amount<Long, Time>> delay;
- private final DelayExecutor executor;
+ private final ScheduledExecutorService executor;
@Inject
public DelayedDeferment(
Supplier<Amount<Long, Time>> delay,
- @AsyncExecutor DelayExecutor executor) {
+ @AsyncExecutor ScheduledExecutorService executor) {
this.delay = requireNonNull(delay);
this.executor = requireNonNull(executor);
@@ -64,7 +65,8 @@ public interface Deferment {
@Override
public void defer(Runnable action) {
- executor.execute(action, delay.get());
+ Amount<Long, Time> actionDelay = delay.get();
+ executor.schedule(action, actionDelay.getValue(), actionDelay.getUnit().getTimeUnit());
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index f778494..3cafbc2 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,6 +14,8 @@
package org.apache.aurora.scheduler.pruning;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
@@ -34,7 +36,6 @@ import org.apache.aurora.gen.apiConstants;
import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
@@ -61,7 +62,7 @@ public class TaskHistoryPruner implements EventSubscriber {
@VisibleForTesting
static final String TASKS_PRUNED = "tasks_pruned";
- private final DelayExecutor executor;
+ private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final Clock clock;
private final HistoryPrunnerSettings settings;
@@ -96,7 +97,7 @@ public class TaskHistoryPruner implements EventSubscriber {
@Inject
TaskHistoryPruner(
- @AsyncExecutor DelayExecutor executor,
+ @AsyncExecutor ScheduledExecutorService executor,
StateManager stateManager,
Clock clock,
HistoryPrunnerSettings settings,
@@ -161,7 +162,7 @@ public class TaskHistoryPruner implements EventSubscriber {
LOG.debug("Prune task {} in {} ms.", taskId, timeRemaining);
- executor.execute(
+ executor.schedule(
shutdownOnError(
lifecycle,
LOG,
@@ -170,7 +171,8 @@ public class TaskHistoryPruner implements EventSubscriber {
LOG.info("Pruning expired inactive task " + taskId);
deleteTasks(ImmutableSet.of(taskId));
}),
- Amount.of(timeRemaining, Time.MILLISECONDS));
+ timeRemaining,
+ TimeUnit.MILLISECONDS);
executor.execute(
shutdownOnError(
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
index 31afa7f..53a3a53 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
@@ -13,6 +13,8 @@
*/
package org.apache.aurora.scheduler.reconciliation;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
@@ -21,13 +23,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -50,7 +49,7 @@ public class KillRetry implements EventSubscriber {
private final Driver driver;
private final Storage storage;
- private final DelayExecutor executor;
+ private final ScheduledExecutorService executor;
private final BackoffStrategy backoffStrategy;
private final AtomicLong killRetries;
@@ -58,7 +57,7 @@ public class KillRetry implements EventSubscriber {
KillRetry(
Driver driver,
Storage storage,
- @AsyncExecutor DelayExecutor executor,
+ @AsyncExecutor ScheduledExecutorService executor,
BackoffStrategy backoffStrategy,
StatsProvider statsProvider) {
@@ -86,7 +85,7 @@ public class KillRetry implements EventSubscriber {
void tryLater() {
retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get()));
- executor.execute(this, Amount.of(retryInMs.get(), Time.MILLISECONDS));
+ executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS);
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
index 8e9a0d3..9910e77 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.reconciliation;
import java.util.EnumSet;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
@@ -29,7 +31,6 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateChangeResult;
@@ -65,7 +66,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
ScheduleStatus.KILLING,
ScheduleStatus.DRAINING);
- private final DelayExecutor executor;
+ private final ScheduledExecutorService executor;
private final Storage storage;
private final StateManager stateManager;
private final Amount<Long, Time> timeout;
@@ -73,7 +74,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
@Inject
TaskTimeout(
- @AsyncExecutor DelayExecutor executor,
+ @AsyncExecutor ScheduledExecutorService executor,
Storage storage,
StateManager stateManager,
Amount<Long, Time> timeout,
@@ -140,7 +141,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
LOG.debug("Retrying timeout of task {} in {}", taskId, NOT_STARTED_RETRY);
// TODO(wfarner): This execution should not wait for a transaction, but a second executor
// would be weird.
- executor.execute(this, NOT_STARTED_RETRY);
+ executor.schedule(this, NOT_STARTED_RETRY.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS);
}
}
}
@@ -148,9 +149,10 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
@Subscribe
public void recordStateChange(TaskStateChange change) {
if (isTransient(change.getNewState())) {
- executor.execute(
+ executor.schedule(
new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
- timeout);
+ timeout.as(Time.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index 2d3492d..b9987e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -19,6 +19,8 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
@@ -39,7 +41,6 @@ import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -73,7 +74,7 @@ public class TaskGroups implements EventSubscriber {
static final String SCHEDULE_ATTEMPTS_BLOCKS = "schedule_attempts_blocks";
private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
- private final DelayExecutor executor;
+ private final ScheduledExecutorService executor;
private final TaskGroupsSettings settings;
private final TaskScheduler taskScheduler;
private final RescheduleCalculator rescheduleCalculator;
@@ -134,7 +135,7 @@ public class TaskGroups implements EventSubscriber {
@VisibleForTesting
@Inject
public TaskGroups(
- @AsyncExecutor DelayExecutor executor,
+ @AsyncExecutor ScheduledExecutorService executor,
TaskGroupsSettings settings,
TaskScheduler taskScheduler,
RescheduleCalculator rescheduleCalculator,
@@ -153,7 +154,7 @@ public class TaskGroups implements EventSubscriber {
// Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could
// remove a group while a task is being added to it.
if (group.hasMore()) {
- executor.execute(evaluate, Amount.of(group.getPenaltyMs(), Time.MILLISECONDS));
+ executor.schedule(evaluate, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
} else {
groups.remove(group.getKey());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index 867c9bd..24692b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -13,19 +13,19 @@
*/
package org.apache.aurora.scheduler.scheduling;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import javax.inject.Inject;
import com.google.common.base.Optional;
import com.google.common.eventbus.Subscribe;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.SlidingStats;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -46,7 +46,7 @@ class TaskThrottler implements EventSubscriber {
private final RescheduleCalculator rescheduleCalculator;
private final Clock clock;
- private final DelayExecutor executor;
+ private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final TaskEventBatchWorker batchWorker;
@@ -56,7 +56,7 @@ class TaskThrottler implements EventSubscriber {
TaskThrottler(
RescheduleCalculator rescheduleCalculator,
Clock clock,
- @AsyncExecutor DelayExecutor executor,
+ @AsyncExecutor ScheduledExecutorService executor,
StateManager stateManager,
TaskEventBatchWorker batchWorker) {
@@ -74,7 +74,7 @@ class TaskThrottler implements EventSubscriber {
+ rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
throttleStats.accumulate(delayMs);
- executor.execute(() ->
+ executor.schedule((Runnable) () ->
batchWorker.execute(storeProvider -> {
stateManager.changeState(
storeProvider,
@@ -84,7 +84,8 @@ class TaskThrottler implements EventSubscriber {
Optional.absent());
return BatchWorker.NO_RESULT;
}),
- Amount.of(delayMs, Time.MILLISECONDS));
+ delayMs,
+ TimeUnit.MILLISECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java
deleted file mode 100644
index a454887..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.ibatis.annotations.Param;
-
-/**
- * MyBatis mapper interface for Attribute.xml.
- */
-interface AttributeMapper {
- /**
- * Saves attributes for a host, based on {@link IHostAttributes#getHost()}.
- *
- * @param attributes Host attributes to save.
- */
- void insert(IHostAttributes attributes);
-
- /**
- * Deletes all attributes and attribute values associated with a slave.
- *
- * @param host Host to delete associated values from.
- */
- void deleteAttributeValues(@Param("host") String host);
-
- /**
- * Updates the mode and slave ID associated with a host.
- *
- * @param host Host to update.
- * @param mode New host maintenance mode.
- * @param slaveId New host slave ID.
- */
- void updateHostModeAndSlaveId(
- @Param("host") String host,
- @Param("mode") MaintenanceMode mode,
- @Param("slaveId") String slaveId);
-
- /**
- * Inserts values in {@link IHostAttributes#getAttributes()}, associating them with
- * {@link IHostAttributes#getSlaveId()}.
- *
- * @param attributes Attributes containing values to insert.
- */
- void insertAttributeValues(IHostAttributes attributes);
-
- /**
- * Retrieves the host attributes associated with a host.
- *
- * @param host Host to fetch attributes for.
- * @return Attributes associated with {@code host}, or {@code null} if no association exists.
- */
- @Nullable
- HostAttributes select(@Param("host") String host);
-
- /**
- * Retrieves all stored host attributes.
- *
- * @return All host attributes.
- */
- List<HostAttributes> selectAll();
-
- /**
- * Deletes all stored attributes and values.
- */
- void truncate();
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java
deleted file mode 100644
index b07928d..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.aurora.scheduler.storage.db.views.DbJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.ibatis.annotations.Param;
-
-/**
- * MyBatis mapper for cron jobs.
- */
-interface CronJobMapper {
-
- void merge(@Param("job") IJobConfiguration job, @Param("task_config_id") long taskConfigId);
-
- void delete(@Param("job") IJobKey job);
-
- void truncate();
-
- List<DbJobConfiguration> selectAll();
-
- @Nullable
- DbJobConfiguration select(@Param("job") IJobKey job);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
deleted file mode 100644
index fee465b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.Objects;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.inject.Inject;
-
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.entities.IAttribute;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank;
-import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
-
-/**
- * Attribute store backed by a relational database.
- */
-class DbAttributeStore implements AttributeStore.Mutable {
-
- private final AttributeMapper mapper;
-
- @Inject
- DbAttributeStore(AttributeMapper mapper) {
- this.mapper = Objects.requireNonNull(mapper);
- }
-
- @Override
- public void deleteHostAttributes() {
- mapper.truncate();
- }
-
- @Timed("attribute_store_save")
- @Override
- public boolean saveHostAttributes(IHostAttributes hostAttributes) {
- checkNotBlank(hostAttributes.getHost());
- checkArgument(hostAttributes.isSetMode());
-
- if (Iterables.any(hostAttributes.getAttributes(), EMPTY_VALUES)) {
- throw new IllegalArgumentException(
- "Host attributes contains empty values: " + hostAttributes);
- }
-
- Optional<IHostAttributes> existing = getHostAttributes(hostAttributes.getHost());
- if (existing.equals(Optional.of(hostAttributes))) {
- return false;
- } else if (existing.isPresent()) {
- mapper.updateHostModeAndSlaveId(
- hostAttributes.getHost(),
- hostAttributes.getMode(),
- hostAttributes.getSlaveId());
- } else {
- mapper.insert(hostAttributes);
- }
-
- mapper.deleteAttributeValues(hostAttributes.getHost());
- if (!hostAttributes.getAttributes().isEmpty()) {
- mapper.insertAttributeValues(hostAttributes);
- }
-
- return true;
- }
-
- private static final Predicate<IAttribute> EMPTY_VALUES =
- attribute -> attribute.getValues().isEmpty();
-
- @Timed("attribute_store_fetch_one")
- @Override
- public Optional<IHostAttributes> getHostAttributes(String host) {
- return Optional.fromNullable(mapper.select(host)).transform(IHostAttributes::build);
- }
-
- @Timed("attribute_store_fetch_all")
- @Override
- public Set<IHostAttributes> getHostAttributes() {
- return IHostAttributes.setFromBuilders(mapper.selectAll());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
deleted file mode 100644
index e48a982..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.FluentIterable;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.db.views.DbJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Cron job store backed by a relational database.
- */
-class DbCronJobStore implements CronJobStore.Mutable {
- private final CronJobMapper cronJobMapper;
- private final JobKeyMapper jobKeyMapper;
- private final TaskConfigManager taskConfigManager;
-
- @Inject
- DbCronJobStore(
- CronJobMapper cronJobMapper,
- JobKeyMapper jobKeyMapper,
- TaskConfigManager taskConfigManager) {
-
- this.cronJobMapper = requireNonNull(cronJobMapper);
- this.jobKeyMapper = requireNonNull(jobKeyMapper);
- this.taskConfigManager = requireNonNull(taskConfigManager);
- }
-
- @Timed("db_storage_cron_save_accepted_job")
- @Override
- public void saveAcceptedJob(IJobConfiguration jobConfig) {
- requireNonNull(jobConfig);
- jobKeyMapper.merge(jobConfig.getKey());
- cronJobMapper.merge(jobConfig, taskConfigManager.insert(jobConfig.getTaskConfig()));
- }
-
- @Timed("db_storage_cron_remove_job")
- @Override
- public void removeJob(IJobKey jobKey) {
- requireNonNull(jobKey);
- cronJobMapper.delete(jobKey);
- }
-
- @Timed("db_storage_cron_delete_jobs")
- @Override
- public void deleteJobs() {
- cronJobMapper.truncate();
- }
-
- @Timed("db_storage_cron_fetch_jobs")
- @Override
- public Iterable<IJobConfiguration> fetchJobs() {
- return FluentIterable.from(cronJobMapper.selectAll())
- .transform(DbJobConfiguration::toImmutable)
- .toList();
- }
-
- @Timed("db_storage_cron_fetch_job")
- @Override
- public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
- requireNonNull(jobKey);
- return Optional.fromNullable(cronJobMapper.select(jobKey))
- .transform(DbJobConfiguration::toImmutable);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
deleted file mode 100644
index af854da..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.gen.JobUpdateAction;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.Util;
-import org.apache.aurora.scheduler.storage.db.views.DbJobUpdate;
-import org.apache.aurora.scheduler.storage.db.views.DbJobUpdateInstructions;
-import org.apache.aurora.scheduler.storage.db.views.DbStoredJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
-import org.apache.aurora.scheduler.storage.entities.IMetadata;
-import org.apache.aurora.scheduler.storage.entities.IRange;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
-
-/**
- * A relational database-backed job update store.
- */
-public class DbJobUpdateStore implements JobUpdateStore.Mutable {
-
- private final JobKeyMapper jobKeyMapper;
- private final JobUpdateDetailsMapper detailsMapper;
- private final JobUpdateEventMapper jobEventMapper;
- private final JobInstanceUpdateEventMapper instanceEventMapper;
- private final TaskConfigManager taskConfigManager;
- private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
- private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
-
- @Inject
- DbJobUpdateStore(
- JobKeyMapper jobKeyMapper,
- JobUpdateDetailsMapper detailsMapper,
- JobUpdateEventMapper jobEventMapper,
- JobInstanceUpdateEventMapper instanceEventMapper,
- TaskConfigManager taskConfigManager,
- StatsProvider statsProvider) {
-
- this.jobKeyMapper = requireNonNull(jobKeyMapper);
- this.detailsMapper = requireNonNull(detailsMapper);
- this.jobEventMapper = requireNonNull(jobEventMapper);
- this.instanceEventMapper = requireNonNull(instanceEventMapper);
- this.taskConfigManager = requireNonNull(taskConfigManager);
- this.jobUpdateEventStats = CacheBuilder.newBuilder()
- .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
- @Override
- public AtomicLong load(JobUpdateStatus status) {
- return statsProvider.makeCounter(Util.jobUpdateStatusStatName(status));
- }
- });
- for (JobUpdateStatus status : JobUpdateStatus.values()) {
- jobUpdateEventStats.getUnchecked(status).get();
- }
- this.jobUpdateActionStats = CacheBuilder.newBuilder()
- .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
- @Override
- public AtomicLong load(JobUpdateAction action) {
- return statsProvider.makeCounter(Util.jobUpdateActionStatName(action));
- }
- });
- for (JobUpdateAction action : JobUpdateAction.values()) {
- jobUpdateActionStats.getUnchecked(action).get();
- }
- }
-
- @Timed("job_update_store_save_update")
- @Override
- public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
- requireNonNull(update);
- if (!update.getInstructions().isSetDesiredState()
- && update.getInstructions().getInitialState().isEmpty()) {
- throw new IllegalArgumentException(
- "Missing both initial and desired states. At least one is required.");
- }
-
- IJobUpdateKey key = update.getSummary().getKey();
- jobKeyMapper.merge(key.getJob());
- detailsMapper.insert(update.newBuilder());
-
- if (lockToken.isPresent()) {
- detailsMapper.insertLockToken(key, lockToken.get());
- }
-
- if (!update.getSummary().getMetadata().isEmpty()) {
- detailsMapper.insertJobUpdateMetadata(
- key,
- IMetadata.toBuildersSet(update.getSummary().getMetadata()));
- }
-
- // Insert optional instance update overrides.
- Set<IRange> instanceOverrides =
- update.getInstructions().getSettings().getUpdateOnlyTheseInstances();
-
- if (!instanceOverrides.isEmpty()) {
- detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides));
- }
-
- // Insert desired state task config and instance mappings.
- if (update.getInstructions().isSetDesiredState()) {
- IInstanceTaskConfig desired = update.getInstructions().getDesiredState();
- detailsMapper.insertTaskConfig(
- key,
- taskConfigManager.insert(desired.getTask()),
- true,
- new InsertResult());
-
- detailsMapper.insertDesiredInstances(
- key,
- IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances())));
- }
-
- // Insert initial state task configs and instance mappings.
- if (!update.getInstructions().getInitialState().isEmpty()) {
- for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) {
- InsertResult result = new InsertResult();
- detailsMapper.insertTaskConfig(
- key,
- taskConfigManager.insert(config.getTask()),
- false,
- result);
-
- detailsMapper.insertTaskConfigInstances(
- result.getId(),
- IRange.toBuildersSet(MorePreconditions.checkNotBlank(config.getInstances())));
- }
- }
- }
-
- @Timed("job_update_store_save_event")
- @Override
- public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
- jobEventMapper.insert(key, event.newBuilder());
- jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
- }
-
- @Timed("job_update_store_save_instance_event")
- @Override
- public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
- instanceEventMapper.insert(key, event.newBuilder());
- jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
- }
-
- @Timed("job_update_store_delete_all")
- @Override
- public void deleteAllUpdatesAndEvents() {
- detailsMapper.truncate();
- }
-
- private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY =
- victim -> IJobUpdateKey.build(victim.getUpdate());
-
- @Timed("job_update_store_prune_history")
- @Override
- public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
- ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder();
-
- Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning(
- perJobRetainCount,
- historyPruneThresholdMs);
-
- for (long jobKeyId : jobKeyIdsToPrune) {
- Set<PruneVictim> pruneVictims = detailsMapper.selectPruneVictims(
- jobKeyId,
- perJobRetainCount,
- historyPruneThresholdMs);
-
- detailsMapper.deleteCompletedUpdates(
- FluentIterable.from(pruneVictims).transform(PruneVictim::getRowId).toSet());
- pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY));
- }
-
- return pruned.build();
- }
-
- @Timed("job_update_store_fetch_summaries")
- @Override
- public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
- return IJobUpdateSummary.listFromBuilders(detailsMapper.selectSummaries(query.newBuilder()));
- }
-
- @Timed("job_update_store_fetch_details_list")
- @Override
- public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
- return FluentIterable
- .from(detailsMapper.selectDetailsList(query.newBuilder()))
- .transform(DbStoredJobUpdateDetails::toThrift)
- .transform(StoredJobUpdateDetails::getDetails)
- .transform(IJobUpdateDetails::build)
- .toList();
- }
-
- @Timed("job_update_store_fetch_details")
- @Override
- public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final IJobUpdateKey key) {
- return Optional.fromNullable(detailsMapper.selectDetails(key))
- .transform(DbStoredJobUpdateDetails::toThrift)
- .transform(StoredJobUpdateDetails::getDetails)
- .transform(IJobUpdateDetails::build);
- }
-
- @Timed("job_update_store_fetch_update")
- @Override
- public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
- return Optional.fromNullable(detailsMapper.selectUpdate(key))
- .transform(DbJobUpdate::toImmutable);
- }
-
- @Timed("job_update_store_fetch_instructions")
- @Override
- public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) {
- return Optional.fromNullable(detailsMapper.selectInstructions(key))
- .transform(DbJobUpdateInstructions::toImmutable);
- }
-
- @Timed("job_update_store_fetch_all_details")
- @Override
- public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
- return FluentIterable.from(detailsMapper.selectAllDetails())
- .transform(DbStoredJobUpdateDetails::toThrift)
- .toSet();
- }
-
- @Timed("job_update_store_fetch_instance_events")
- @Override
- public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) {
- return IJobInstanceUpdateEvent.listFromBuilders(
- detailsMapper.selectInstanceUpdateEvents(key, instanceId));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java
deleted file mode 100644
index 9e28550..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-
-import com.google.inject.Inject;
-
-import org.apache.aurora.GuavaUtils;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.db.views.LockRow;
-import org.apache.aurora.scheduler.storage.entities.ILock;
-import org.apache.aurora.scheduler.storage.entities.ILockKey;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
-
-/**
- * A relational database-backed lock store.
- */
-class DbLockStore implements LockStore.Mutable {
-
- private final LockMapper mapper;
- private final LockKeyMapper lockKeyMapper;
-
- @Inject
- DbLockStore(LockMapper mapper, LockKeyMapper lockKeyMapper) {
- this.mapper = requireNonNull(mapper);
- this.lockKeyMapper = requireNonNull(lockKeyMapper);
- }
-
- @Timed("lock_store_save_lock")
- @Override
- public void saveLock(ILock lock) {
- lockKeyMapper.insert(lock.getKey());
- mapper.insert(lock.newBuilder());
- }
-
- @Timed("lock_store_remove_lock")
- @Override
- public void removeLock(ILockKey lockKey) {
- mapper.delete(lockKey.newBuilder());
- }
-
- @Timed("lock_store_delete_locks")
- @Override
- public void deleteLocks() {
- mapper.truncate();
- }
-
- @Timed("lock_store_fetch_locks")
- @Override
- public Set<ILock> fetchLocks() {
- return mapper.selectAll().stream().map(TO_ROW).collect(GuavaUtils.toImmutableSet());
- }
-
- @Timed("lock_store_fetch_lock")
- @Override
- public Optional<ILock> fetchLock(ILockKey lockKey) {
- return Optional.ofNullable(mapper.select(lockKey.newBuilder())).map(TO_ROW);
- }
-
- /**
- * LockRow converter to satisfy the ILock interface.
- */
- private static final Function<LockRow, ILock> TO_ROW = input -> ILock.build(input.getLock());
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
deleted file mode 100644
index 7bd37f7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.AbstractScheduledService;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.util.Modules;
-
-import org.apache.aurora.common.inject.Bindings.KeyFactory;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.GatedWorkQueue;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.LockStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.typehandlers.TypeHandlers;
-import org.apache.ibatis.migration.JavaMigrationLoader;
-import org.apache.ibatis.migration.MigrationLoader;
-import org.apache.ibatis.session.AutoMappingBehavior;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
-import org.mybatis.guice.MyBatisModule;
-import org.mybatis.guice.datasource.builtin.PooledDataSourceProvider;
-import org.mybatis.guice.datasource.helper.JdbcHelper;
-
-import static java.util.Objects.requireNonNull;
-
-import static com.google.inject.name.Names.bindProperties;
-
-/**
- * Binding module for a relational database storage system.
- */
-public final class DbModule extends PrivateModule {
-
- @Parameters(separators = "=")
- public static class Options {
- @Parameter(names = "-enable_db_metrics",
- description =
- "Whether to use MyBatis interceptor to measure the timing of intercepted Statements.",
- arity = 1)
- public boolean enableDbMetrics = true;
-
- @Parameter(names = "-slow_query_log_threshold",
- description = "Log all queries that take at least this long to execute.")
- public TimeAmount slowQueryLogThreshold = new TimeAmount(25, Time.MILLISECONDS);
-
- @Parameter(names = "-db_row_gc_interval",
- description = "Interval on which to scan the database for unused row references.")
- public TimeAmount dbRowGcInterval = new TimeAmount(2, Time.HOURS);
-
- // http://h2database.com/html/grammar.html#set_lock_timeout
- @Parameter(names = "-db_lock_timeout", description = "H2 table lock timeout")
- public TimeAmount h2LockTimeout = new TimeAmount(1, Time.MINUTES);
-
- @Parameter(names = "-db_max_active_connection_count",
- description = "Max number of connections to use with database via MyBatis")
- public int mybatisMaxActiveConnectionCount = -1;
-
- @Parameter(names = "-db_max_idle_connection_count",
- description = "Max number of idle connections to the database via MyBatis")
- public int mybatisMaxIdleConnectionCount = -1;
- }
-
- private static final Set<Class<?>> MAPPER_CLASSES = ImmutableSet.<Class<?>>builder()
- .add(AttributeMapper.class)
- .add(CronJobMapper.class)
- .add(EnumValueMapper.class)
- .add(FrameworkIdMapper.class)
- .add(JobInstanceUpdateEventMapper.class)
- .add(JobKeyMapper.class)
- .add(JobUpdateEventMapper.class)
- .add(JobUpdateDetailsMapper.class)
- .add(LockMapper.class)
- .add(MigrationMapper.class)
- .add(QuotaMapper.class)
- .add(TaskConfigMapper.class)
- .add(TaskMapper.class)
- .build();
-
- private final Options options;
- private final KeyFactory keyFactory;
- private final String jdbcSchema;
-
- private DbModule(
- Options options,
- KeyFactory keyFactory,
- String dbName,
- Map<String, String> jdbcUriArgs) {
-
- this.options = requireNonNull(options);
- this.keyFactory = requireNonNull(keyFactory);
-
- Map<String, String> args = ImmutableMap.<String, String>builder()
- .putAll(jdbcUriArgs)
- // READ COMMITTED transaction isolation. More details here
- // http://www.h2database.com/html/advanced.html?#transaction_isolation
- .put("LOCK_MODE", "3")
- // Send log messages from H2 to SLF4j
- // See http://www.h2database.com/html/features.html#other_logging
- .put("TRACE_LEVEL_FILE", "4")
- // Enable Query Statistics
- .put("QUERY_STATISTICS", "TRUE")
- // Configure the lock timeout
- .put("LOCK_TIMEOUT", options.h2LockTimeout.as(Time.MILLISECONDS).toString())
- .build();
- this.jdbcSchema = dbName + ";" + Joiner.on(";").withKeyValueSeparator("=").join(args);
- }
-
- /**
- * Creates a module that will prepare a volatile storage system suitable for use in a production
- * environment.
- *
- * @param keyFactory Binding scope for the storage system.
- * @return A new database module for production.
- */
- public static Module productionModule(KeyFactory keyFactory, DbModule.Options options) {
- return new DbModule(
- options,
- keyFactory,
- "aurora",
- ImmutableMap.of("DB_CLOSE_DELAY", "-1"));
- }
-
- @VisibleForTesting
- public static Module testModule(KeyFactory keyFactory) {
- DbModule.Options options = new DbModule.Options();
- return new DbModule(
- options,
- keyFactory,
- "testdb-" + UUID.randomUUID().toString(),
- // A non-zero close delay is used here to avoid eager database cleanup in tests that
- // make use of multiple threads. Since all test databases are separately scoped by the
- // included UUID, multiple DB instances will overlap in time but they should be distinct
- // in content.
- ImmutableMap.of("DB_CLOSE_DELAY", "5"));
- }
-
- /**
- * Same as {@link #testModuleWithWorkQueue(KeyFactory)} but with default task store and
- * key factory.
- *
- * @return A new database module for testing.
- */
- @VisibleForTesting
- public static Module testModule() {
- return testModule(KeyFactory.PLAIN);
- }
-
- /**
- * Creates a module that will prepare a private in-memory database, using a specific task store
- * implementation bound within the key factory and provided module.
- *
- * @param keyFactory Key factory to use.
- * @return A new database module for testing.
- */
- @VisibleForTesting
- public static Module testModuleWithWorkQueue(KeyFactory keyFactory) {
- return Modules.combine(
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(
- new GatedWorkQueue() {
- @Override
- public <T, E extends Exception> T closeDuring(
- GatedOperation<T, E> operation) throws E {
-
- return operation.doWithGateClosed();
- }
- });
- }
- },
- testModule(keyFactory)
- );
- }
-
- /**
- * Same as {@link #testModuleWithWorkQueue(KeyFactory)} but with default key factory.
- *
- * @return A new database module for testing.
- */
- @VisibleForTesting
- public static Module testModuleWithWorkQueue() {
- return testModuleWithWorkQueue(KeyFactory.PLAIN);
- }
-
- private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
- bind(binding).to(impl);
- bind(impl).in(Singleton.class);
- Key<T> key = keyFactory.create(binding);
- bind(key).to(impl);
- expose(key);
- }
-
- @Override
- protected void configure() {
- install(new MyBatisModule() {
- @Override
- protected void initialize() {
- if (options.enableDbMetrics) {
- addInterceptorClass(InstrumentingInterceptor.class);
- }
-
- bindProperties(binder(), ImmutableMap.of("JDBC.schema", jdbcSchema));
- install(JdbcHelper.H2_IN_MEMORY_NAMED);
-
- // We have no plans to take advantage of multiple DB environments. This is a
- // required property though, so we use an unnamed environment.
- environmentId("");
-
- bindTransactionFactoryType(JdbcTransactionFactory.class);
- bindDataSourceProviderType(PooledDataSourceProvider.class);
- addMapperClasses(MAPPER_CLASSES);
-
- // Full auto-mapping enables population of nested objects with minimal mapper configuration.
- // Docs on settings can be found here:
- // http://mybatis.github.io/mybatis-3/configuration.html#settings
- autoMappingBehavior(AutoMappingBehavior.FULL);
-
- addTypeHandlersClasses(TypeHandlers.getAll());
-
- bind(new TypeLiteral<Amount<Long, Time>>() { })
- .toInstance(options.slowQueryLogThreshold);
-
- // Enable a ping query which will prevent the use of invalid connections in the
- // connection pool.
- bindProperties(binder(), ImmutableMap.of("mybatis.pooled.pingEnabled", "true"));
- bindProperties(binder(), ImmutableMap.of("mybatis.pooled.pingQuery", "SELECT 1;"));
-
- if (options.mybatisMaxActiveConnectionCount > 0) {
- String val = String.valueOf(options.mybatisMaxActiveConnectionCount);
- bindProperties(binder(), ImmutableMap.of("mybatis.pooled.maximumActiveConnections", val));
- }
-
- if (options.mybatisMaxIdleConnectionCount > 0) {
- String val = String.valueOf(options.mybatisMaxIdleConnectionCount);
- bindProperties(binder(), ImmutableMap.of("mybatis.pooled.maximumIdleConnections", val));
- }
-
- // Exposed for unit tests.
- bind(TaskConfigManager.class);
- expose(TaskConfigManager.class);
-
- // TODO(wfarner): Don't expose these bindings once the task store is directly bound here.
- expose(TaskMapper.class);
- expose(TaskConfigManager.class);
- expose(JobKeyMapper.class);
- }
- });
- expose(keyFactory.create(CronJobStore.Mutable.class));
- expose(keyFactory.create(TaskStore.Mutable.class));
-
- bindStore(AttributeStore.Mutable.class, DbAttributeStore.class);
- bindStore(LockStore.Mutable.class, DbLockStore.class);
- bindStore(QuotaStore.Mutable.class, DbQuotaStore.class);
- bindStore(SchedulerStore.Mutable.class, DbSchedulerStore.class);
- bindStore(JobUpdateStore.Mutable.class, DbJobUpdateStore.class);
- bindStore(TaskStore.Mutable.class, DbTaskStore.class);
- bindStore(CronJobStore.Mutable.class, DbCronJobStore.class);
-
- Key<Storage> storageKey = keyFactory.create(Storage.class);
- bind(storageKey).to(DbStorage.class);
- bind(DbStorage.class).in(Singleton.class);
- expose(storageKey);
-
- bind(EnumBackfill.class).to(EnumBackfill.EnumBackfillImpl.class);
- bind(EnumBackfill.EnumBackfillImpl.class).in(Singleton.class);
- expose(EnumBackfill.class);
-
- expose(DbStorage.class);
- expose(SqlSessionFactory.class);
- expose(TaskMapper.class);
- expose(TaskConfigMapper.class);
- expose(JobKeyMapper.class);
- }
-
- /**
- * Module that sets up a periodic database garbage-collection routine.
- */
- public static class GarbageCollectorModule extends AbstractModule {
-
- private final Options options;
-
- public GarbageCollectorModule(Options options) {
- this.options = options;
- }
-
- @Override
- protected void configure() {
- install(new PrivateModule() {
- @Override
- protected void configure() {
- bind(RowGarbageCollector.class).in(Singleton.class);
- bind(AbstractScheduledService.Scheduler.class).toInstance(
- AbstractScheduledService.Scheduler.newFixedRateSchedule(
- 0L,
- options.dbRowGcInterval.getValue(),
- options.dbRowGcInterval.getUnit().getTimeUnit()));
- expose(RowGarbageCollector.class);
- }
- });
- SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
- .to(RowGarbageCollector.class);
- }
- }
-
- public static class MigrationManagerModule extends PrivateModule {
- private static final String MIGRATION_PACKAGE =
- "org.apache.aurora.scheduler.storage.db.migration";
-
- private final MigrationLoader migrationLoader;
-
- public MigrationManagerModule() {
- this.migrationLoader = new JavaMigrationLoader(MIGRATION_PACKAGE);
- }
-
- public MigrationManagerModule(MigrationLoader migrationLoader) {
- this.migrationLoader = requireNonNull(migrationLoader);
- }
-
- @Override
- protected void configure() {
- bind(MigrationLoader.class).toInstance(migrationLoader);
-
- bind(MigrationManager.class).to(MigrationManagerImpl.class);
- expose(MigrationManager.class);
- }
- }
-}