You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/22 00:44:05 UTC
[1/2] incubator-aurora git commit: Remove ReadWriteLock from
MemStorage, remove Storage#weaklyConsistentRead.
Repository: incubator-aurora
Updated Branches:
refs/heads/master ecc3fbcac -> 5116c2209
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
index 6d8d52a..dc27c3e 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbQuotaStoreTest.java
@@ -97,7 +97,7 @@ public class DbQuotaStoreTest {
}
private Optional<IResourceAggregate> select(final String role) {
- return storage.consistentRead(new Work.Quiet<Optional<IResourceAggregate>>() {
+ return storage.read(new Work.Quiet<Optional<IResourceAggregate>>() {
@Override
public Optional<IResourceAggregate> apply(StoreProvider storeProvider) {
return storeProvider.getQuotaStore().fetchQuota(role);
@@ -108,7 +108,7 @@ public class DbQuotaStoreTest {
private void assertQuotas(Map<String, IResourceAggregate> quotas) {
assertEquals(
quotas,
- storage.consistentRead(new Work.Quiet<Map<String, IResourceAggregate>>() {
+ storage.read(new Work.Quiet<Map<String, IResourceAggregate>>() {
@Override
public Map<String, IResourceAggregate> apply(StoreProvider storeProvider) {
return storeProvider.getQuotaStore().fetchQuotas();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
index 9c00c8b..0bfb4d4 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbSchedulerStoreTest.java
@@ -55,7 +55,7 @@ public class DbSchedulerStoreTest {
}
private Optional<String> select() {
- return storage.consistentRead(new Work.Quiet<Optional<String>>() {
+ return storage.read(new Work.Quiet<Optional<String>>() {
@Override
public Optional<String> apply(StoreProvider storeProvider) {
return storeProvider.getSchedulerStore().fetchFrameworkId();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 259c6a9..0a5cc51 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage.log;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Function;
import com.google.common.base.Functions;
@@ -177,7 +178,8 @@ public class LogStorageTest extends EasyMockTest {
storageUtil.quotaStore,
storageUtil.attributeStore,
storageUtil.jobUpdateStore,
- eventSink);
+ eventSink,
+ new ReentrantLock());
stream = createMock(Stream.class);
streamMatcher = LogOpMatcher.matcherFor(stream);
@@ -481,7 +483,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws CodingException {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.schedulerStore.saveFrameworkId(frameworkId);
streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)))
.andReturn(position);
@@ -502,7 +504,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.jobStore.saveAcceptedJob(managerId, jobConfig);
streamMatcher.expectTransaction(
Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())))
@@ -521,7 +523,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.jobStore.removeJob(JOB_KEY);
streamMatcher.expectTransaction(
Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())))
@@ -541,7 +543,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.taskStore.saveTasks(tasks);
streamMatcher.expectTransaction(
Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))))
@@ -564,7 +566,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
streamMatcher.expectTransaction(
Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
@@ -587,7 +589,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
expect(storageUtil.taskStore.unsafeModifyInPlace(taskId2, updatedConfig)).andReturn(false);
expect(storageUtil.taskStore.unsafeModifyInPlace(taskId, updatedConfig)).andReturn(true);
streamMatcher.expectTransaction(
@@ -614,7 +616,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
storageUtil.taskStore.deleteTasks(tasksToRemove);
@@ -649,7 +651,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.taskStore.saveTasks(saved);
// Nested transaction with result.
@@ -679,7 +681,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.taskStore.saveTasks(saved);
// Nested transaction with result.
@@ -710,7 +712,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.taskStore.deleteTasks(taskIds);
streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
.andReturn(position);
@@ -729,7 +731,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.taskStore.deleteTasks(taskIds);
streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
.andReturn(position);
@@ -751,7 +753,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.quotaStore.saveQuota(role, quota);
streamMatcher.expectTransaction(Op.saveQuota(new SaveQuota(role, quota.newBuilder())))
.andReturn(position);
@@ -770,7 +772,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.quotaStore.removeQuota(role);
streamMatcher.expectTransaction(Op.removeQuota(new RemoveQuota(role))).andReturn(position);
}
@@ -792,7 +794,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.lockStore.saveLock(lock);
streamMatcher.expectTransaction(Op.saveLock(new SaveLock(lock.newBuilder())))
.andReturn(position);
@@ -812,7 +814,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.lockStore.removeLock(lockKey);
streamMatcher.expectTransaction(Op.removeLock(new RemoveLock(lockKey.newBuilder())))
.andReturn(position);
@@ -838,7 +840,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
expect(storageUtil.attributeStore.getHostAttributes(host))
.andReturn(Optional.<IHostAttributes>absent());
@@ -902,7 +904,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.jobUpdateStore.saveJobUpdate(update, lockToken);
streamMatcher.expectTransaction(
Op.saveJobUpdate(new SaveJobUpdate(update.newBuilder(), lockToken.orNull())))
@@ -925,7 +927,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.jobUpdateStore.saveJobUpdateEvent(event, UPDATE_ID);
streamMatcher.expectTransaction(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
event.newBuilder(),
@@ -949,7 +951,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(event, UPDATE_ID);
streamMatcher.expectTransaction(Op.saveJobInstanceUpdateEvent(
new SaveJobInstanceUpdateEvent(event.newBuilder(), UPDATE_ID))).andReturn(position);
@@ -971,7 +973,7 @@ public class LogStorageTest extends EasyMockTest {
new MutationFixture() {
@Override
protected void setupExpectations() throws Exception {
- storageUtil.expectWriteOperation();
+ storageUtil.expectWrite();
expect(storageUtil.jobUpdateStore.pruneHistory(
pruneHistory.getPerJobRetainCount(),
pruneHistory.getHistoryPruneThresholdMs())).andReturn(ImmutableSet.of("id1"));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
index 463b445..30e2328 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
@@ -45,7 +45,6 @@ import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
import org.junit.Test;
@@ -58,7 +57,6 @@ import static org.junit.Assert.fail;
public class MemStorageTest extends TearDownTestCase {
private ExecutorService executor;
- private FakeStatsProvider statsProvider;
private Storage storage;
@Before
@@ -71,8 +69,7 @@ public class MemStorageTest extends TearDownTestCase {
new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
}
});
- statsProvider = new FakeStatsProvider();
- storage = MemStorage.newEmptyStorage(statsProvider);
+ storage = MemStorage.newEmptyStorage();
}
@Test
@@ -85,7 +82,7 @@ public class MemStorageTest extends TearDownTestCase {
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
- return storage.consistentRead(new Work.Quiet<String>() {
+ return storage.read(new Work.Quiet<String>() {
@Override
public String apply(StoreProvider storeProvider) {
slowReadStarted.countDown();
@@ -102,7 +99,7 @@ public class MemStorageTest extends TearDownTestCase {
slowReadStarted.await();
- String fastResult = storage.consistentRead(new Work.Quiet<String>() {
+ String fastResult = storage.read(new Work.Quiet<String>() {
@Override
public String apply(StoreProvider storeProvider) {
return "fastResult";
@@ -111,13 +108,6 @@ public class MemStorageTest extends TearDownTestCase {
assertEquals("fastResult", fastResult);
slowReadFinished.countDown();
assertEquals("slowResult", future.get());
-
- // It would be nice to check the stat values (specifically with simulated lock contention, but
- // this value is based off ReentrantReadWriteLock#getQueueLength(), which is documented as an
- // approximation and not a guaranteed accurate value.
- assertEquals(
- ImmutableSet.of(MemStorage.THREADS_WAITING_GAUGE),
- statsProvider.getAllValues().keySet());
}
private IScheduledTask makeTask(String taskId) {
@@ -143,7 +133,7 @@ public class MemStorageTest extends TearDownTestCase {
}
private void expectTasks(final String... taskIds) {
- storage.consistentRead(new Work.Quiet<Void>() {
+ storage.read(new Work.Quiet<Void>() {
@Override
public Void apply(StoreProvider storeProvider) {
Query.Builder query = Query.unscoped();
@@ -174,12 +164,12 @@ public class MemStorageTest extends TearDownTestCase {
// Expected
}
- storage.consistentRead(new Work.Quiet<Void>() {
+ storage.read(new Work.Quiet<Void>() {
@Override
public Void apply(StoreProvider storeProvider) {
// If the previous write was under a transaction then there would be no quota records.
assertEquals(ImmutableMap.<String, IResourceAggregate>of(),
- storeProvider.getQuotaStore().fetchQuotas());
+ storeProvider.getQuotaStore().fetchQuotas());
return null;
}
});
@@ -221,7 +211,7 @@ public class MemStorageTest extends TearDownTestCase {
}
});
expectTasks("a");
- storage.consistentRead(new Work.Quiet<Void>() {
+ storage.read(new Work.Quiet<Void>() {
@Override
public Void apply(StoreProvider storeProvider) {
assertEquals(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index 6918cba..d492e17 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -73,9 +73,9 @@ public class StorageTestUtil {
this.storage = easyMock.createMock(NonVolatileStorage.class);
}
- public <T> IExpectationSetters<T> expectConsistentRead() {
+ public <T> IExpectationSetters<T> expectRead() {
final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
- return expect(storage.<T, RuntimeException>consistentRead(capture(work)))
+ return expect(storage.read(capture(work)))
.andAnswer(new IAnswer<T>() {
@Override
public T answer() {
@@ -84,20 +84,9 @@ public class StorageTestUtil {
});
}
- public <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
- final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
- return expect(storage.<T, RuntimeException>weaklyConsistentRead(capture(work)))
- .andAnswer(new IAnswer<T>() {
- @Override
- public T answer() {
- return work.getValue().apply(storeProvider);
- }
- });
- }
-
- public <T> IExpectationSetters<T> expectWriteOperation() {
+ public <T> IExpectationSetters<T> expectWrite() {
final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
- return expect(storage.<T, RuntimeException>write(capture(work))).andAnswer(new IAnswer<T>() {
+ return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
@Override
public T answer() {
return work.getValue().apply(mutableStoreProvider);
@@ -124,9 +113,8 @@ public class StorageTestUtil {
expect(mutableStoreProvider.getLockStore()).andReturn(lockStore).anyTimes();
expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
expect(mutableStoreProvider.getJobUpdateStore()).andReturn(jobUpdateStore).anyTimes();
- expectConsistentRead().anyTimes();
- expectWeaklyConsistentRead().anyTimes();
- expectWriteOperation().anyTimes();
+ expectRead().anyTimes();
+ expectWrite().anyTimes();
}
public IExpectationSetters<?> expectTaskFetch(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 2533d82..4c827b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -217,7 +217,7 @@ public class JobUpdaterIT extends EasyMockTest {
private String getTaskId(IJobKey job, int instanceId) {
return Tasks.id(Iterables.getOnlyElement(
- Storage.Util.consistentFetchTasks(
+ Storage.Util.fetchTasks(
storage,
Query.instanceScoped(job, instanceId).active())));
}
@@ -271,7 +271,7 @@ public class JobUpdaterIT extends EasyMockTest {
JobUpdateStatus expected,
Multimap<Integer, JobUpdateAction> expectedActions) {
- IJobUpdateDetails details = storage.consistentRead(new Work.Quiet<IJobUpdateDetails>() {
+ IJobUpdateDetails details = storage.read(new Work.Quiet<IJobUpdateDetails>() {
@Override
public IJobUpdateDetails apply(StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(UPDATE_ID).get();
@@ -314,7 +314,7 @@ public class JobUpdaterIT extends EasyMockTest {
private void assertJobState(IJobKey job, Map<Integer, ITaskConfig> expected) {
Iterable<IScheduledTask> tasks =
- Storage.Util.consistentFetchTasks(storage, Query.jobScoped(job).active());
+ Storage.Util.fetchTasks(storage, Query.jobScoped(job).active());
Map<Integer, IScheduledTask> tasksByInstance =
Maps.uniqueIndex(tasks, Tasks.SCHEDULED_TO_INSTANCE_ID);
[2/2] incubator-aurora git commit: Remove ReadWriteLock from
MemStorage, remove Storage#weaklyConsistentRead.
Posted by wf...@apache.org.
Remove ReadWriteLock from MemStorage, remove Storage#weaklyConsistentRead.
Bugs closed: AURORA-929
Reviewed at https://reviews.apache.org/r/28097/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/5116c220
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/5116c220
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/5116c220
Branch: refs/heads/master
Commit: 5116c22098ccef2dbc35ad1b71992f2868406e32
Parents: ecc3fbc
Author: Bill Farner <wf...@apache.org>
Authored: Wed Nov 19 15:35:04 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Nov 21 15:42:39 2014 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/TaskVars.java | 2 +-
.../scheduler/async/GcExecutorLauncher.java | 4 +-
.../aurora/scheduler/async/KillRetry.java | 2 +-
.../scheduler/async/RescheduleCalculator.java | 5 +-
.../scheduler/async/TaskHistoryPruner.java | 5 +-
.../async/preemptor/LiveClusterState.java | 2 +-
.../scheduler/async/preemptor/Preemptor.java | 5 +-
.../scheduler/cron/quartz/AuroraCronJob.java | 2 +-
.../cron/quartz/CronJobManagerImpl.java | 6 +-
.../aurora/scheduler/http/Maintenance.java | 2 +-
.../org/apache/aurora/scheduler/http/Mname.java | 2 +-
.../apache/aurora/scheduler/http/Quotas.java | 2 +-
.../apache/aurora/scheduler/http/Slaves.java | 2 +-
.../aurora/scheduler/http/StructDump.java | 2 +-
.../scheduler/mesos/SchedulerDriverService.java | 2 +-
.../aurora/scheduler/quota/QuotaManager.java | 2 +-
.../aurora/scheduler/sla/MetricCalculator.java | 2 +-
.../aurora/scheduler/state/LockManagerImpl.java | 4 +-
.../scheduler/state/MaintenanceController.java | 4 +-
.../aurora/scheduler/stats/ResourceCounter.java | 4 +-
.../storage/CallOrderEnforcingStorage.java | 12 +-
.../scheduler/storage/ReadWriteLockManager.java | 131 -------------------
.../aurora/scheduler/storage/Storage.java | 58 ++------
.../storage/backup/TemporaryStorage.java | 2 +-
.../aurora/scheduler/storage/db/DbStorage.java | 29 ++--
.../scheduler/storage/log/LogStorage.java | 109 ++++++++-------
.../scheduler/storage/log/LogStorageModule.java | 23 ++--
.../storage/log/SnapshotStoreImpl.java | 2 +-
.../scheduler/storage/mem/MemStorage.java | 112 ++--------------
.../thrift/SchedulerThriftInterface.java | 14 +-
.../app/local/FakeNonVolatileStorage.java | 11 +-
.../cron/quartz/CronJobManagerImplTest.java | 2 +-
.../filter/SchedulingFilterImplTest.java | 25 ----
.../scheduler/state/LockManagerImplTest.java | 2 +-
.../scheduler/state/StateManagerImplTest.java | 10 +-
.../storage/ReadWriteLockManagerTest.java | 109 ---------------
.../scheduler/storage/StorageBackfillTest.java | 8 +-
.../storage/db/DBJobUpdateStoreTest.java | 12 +-
.../storage/db/DbAttributeStoreTest.java | 4 +-
.../scheduler/storage/db/DbLockStoreTest.java | 4 +-
.../scheduler/storage/db/DbQuotaStoreTest.java | 4 +-
.../storage/db/DbSchedulerStoreTest.java | 2 +-
.../scheduler/storage/log/LogStorageTest.java | 44 ++++---
.../scheduler/storage/mem/MemStorageTest.java | 24 +---
.../storage/testing/StorageTestUtil.java | 24 +---
.../aurora/scheduler/updater/JobUpdaterIT.java | 6 +-
46 files changed, 205 insertions(+), 639 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 2f93054..f017cdd 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -167,7 +167,7 @@ class TaskVars extends AbstractIdleService implements EventSubscriber {
if (Strings.isNullOrEmpty(task.getAssignedTask().getSlaveHost())) {
rack = Optional.absent();
} else {
- rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
+ rack = storage.read(new Work.Quiet<Optional<String>>() {
@Override
public Optional<String> apply(StoreProvider storeProvider) {
Optional<IAttribute> rack = FluentIterable
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index e02921d..5226e3d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -179,9 +179,7 @@ public class GcExecutorLauncher implements TaskLauncher {
}
private TaskInfo makeGcTask(String hostName, SlaveID slaveId) {
- Set<IScheduledTask> tasksOnHost =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(hostName));
-
+ Set<IScheduledTask> tasksOnHost = Storage.Util.fetchTasks(storage, Query.slaveScoped(hostName));
tasksCreated.incrementAndGet();
return makeGcTask(
hostName,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
index d6b7fab..3bb80ec 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
@@ -88,7 +88,7 @@ public class KillRetry implements EventSubscriber {
@Override
public void run() {
Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING);
- if (!Storage.Util.weaklyConsistentFetchTasks(storage, query).isEmpty()) {
+ if (!Storage.Util.fetchTasks(storage, query).isEmpty()) {
LOG.info("Task " + taskId + " not yet killed, retrying.");
// Kill did not yet take effect, try again.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index ca54c9a..0cf7fb4 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -25,7 +25,6 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.twitter.common.quantity.Amount;
@@ -147,8 +146,8 @@ public interface RescheduleCalculator {
return Optional.absent();
}
- ImmutableSet<IScheduledTask> res =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+ Set<IScheduledTask> res =
+ Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
return Optional.fromNullable(Iterables.getOnlyElement(res, null));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
index 58d074b..985a319 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.scheduler.async;
-import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -156,8 +155,8 @@ public class TaskHistoryPruner implements EventSubscriber {
executor.submit(new Runnable() {
@Override
public void run() {
- Collection<IScheduledTask> inactiveTasks =
- Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
+ Set<IScheduledTask> inactiveTasks =
+ Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
Set<String> toPrune = FluentIterable
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
index 0da4d2a..9d83acc 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
@@ -59,7 +59,7 @@ class LiveClusterState implements ClusterState {
public Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
// Only non-pending active tasks may be preempted.
Iterable<IAssignedTask> activeTasks = Iterables.transform(
- Storage.Util.consistentFetchTasks(storage, CANDIDATE_QUERY),
+ Storage.Util.fetchTasks(storage, CANDIDATE_QUERY),
SCHEDULED_TO_ASSIGNED);
// Group the tasks by slave id so they can be paired with offers from the same slave.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
index afbd645..767e601 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -53,6 +53,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.Util;
+import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -275,7 +276,7 @@ public interface Preemptor {
}
private Optional<IHostAttributes> getHostAttributes(final String host) {
- return storage.weaklyConsistentRead(new Storage.Work.Quiet<Optional<IHostAttributes>>() {
+ return storage.read(new Work.Quiet<Optional<IHostAttributes>>() {
@Override
public Optional<IHostAttributes> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getAttributeStore().getHostAttributes(host);
@@ -294,7 +295,7 @@ public interface Preemptor {
private Optional<IAssignedTask> fetchIdlePendingTask(String taskId) {
Query.Builder query = Query.taskScoped(taskId).byStatus(PENDING);
Iterable<IAssignedTask> result = FluentIterable
- .from(Util.consistentFetchTasks(storage, query))
+ .from(Util.fetchTasks(storage, query))
.filter(isIdleTask)
.transform(SCHEDULED_TO_ASSIGNED);
return Optional.fromNullable(Iterables.getOnlyElement(result, null));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index 84e37e4..efea7ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -205,7 +205,7 @@ class AuroraCronJob implements Job {
delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
@Override
public Boolean get() {
- if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
+ if (Storage.Util.fetchTasks(storage, query).isEmpty()) {
LOG.info("Initiating delayed launch of cron " + path);
storage.write(new Storage.MutateWork.NoResult.Quiet() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
index 8e2d3d9..28f1ae7 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
@@ -74,7 +74,7 @@ class CronJobManagerImpl implements CronJobManager {
public void startJobNow(final IJobKey jobKey) throws CronException {
requireNonNull(jobKey);
- storage.weaklyConsistentRead(new Work<Void, CronException>() {
+ storage.read(new Work<Void, CronException>() {
@Override
public Void apply(Storage.StoreProvider storeProvider) throws CronException {
checkCronExists(jobKey, storeProvider.getJobStore());
@@ -182,7 +182,7 @@ class CronJobManagerImpl implements CronJobManager {
@Override
public Iterable<IJobConfiguration> getJobs() {
// NOTE: no synchronization is needed here since we don't touch internal quartz state.
- return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
+ return storage.read(new Work.Quiet<Iterable<IJobConfiguration>>() {
@Override
public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobStore().fetchJobs(getManagerKey());
@@ -194,7 +194,7 @@ class CronJobManagerImpl implements CronJobManager {
public boolean hasJob(final IJobKey jobKey) {
requireNonNull(jobKey);
- return storage.consistentRead(new Work.Quiet<Boolean>() {
+ return storage.read(new Work.Quiet<Boolean>() {
@Override
public Boolean apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobStore().fetchJob(getManagerKey(), jobKey).isPresent();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
index be8a1ff..303f05c 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -58,7 +58,7 @@ public class Maintenance {
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getHosts() {
- return storage.weaklyConsistentRead(new Work.Quiet<Response>() {
+ return storage.read(new Work.Quiet<Response>() {
@Override
public Response apply(StoreProvider storeProvider) {
Multimap<MaintenanceMode, String> hostsByMode =
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Mname.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Mname.java b/src/main/java/org/apache/aurora/scheduler/http/Mname.java
index 883f954..69ce2aa 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Mname.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Mname.java
@@ -185,7 +185,7 @@ public class Mname {
Optional<String> forwardRequest) {
IScheduledTask task = Iterables.getOnlyElement(
- Storage.Util.consistentFetchTasks(storage,
+ Storage.Util.fetchTasks(storage,
Query.instanceScoped(JobKeys.from(role, env, job), instanceId).active()),
null);
if (task == null) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
index 5f3cce3..e1bf0cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
@@ -56,7 +56,7 @@ public class Quotas {
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getQuotas(@QueryParam("role") final String role) {
- return storage.weaklyConsistentRead(new Work.Quiet<Response>() {
+ return storage.read(new Work.Quiet<Response>() {
@Override
public Response apply(StoreProvider storeProvider) {
Map<String, IResourceAggregate> quotas;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
index 0ea462f..b64e18c 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
@@ -63,7 +63,7 @@ public class Slaves extends JerseyTemplateServlet {
}
private Iterable<IHostAttributes> getHostAttributes() {
- return storage.weaklyConsistentRead(new Work.Quiet<Iterable<IHostAttributes>>() {
+ return storage.read(new Work.Quiet<Iterable<IHostAttributes>>() {
@Override
public Iterable<IHostAttributes> apply(StoreProvider storeProvider) {
return storeProvider.getAttributeStore().getHostAttributes();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index 8147d54..12b28be 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -122,7 +122,7 @@ public class StructDump extends JerseyTemplateServlet {
@Override
public void execute(StringTemplate template) {
template.setAttribute("id", id);
- Optional<? extends TBase<?, ?>> struct = storage.weaklyConsistentRead(work);
+ Optional<? extends TBase<?, ?>> struct = storage.read(work);
if (struct.isPresent()) {
template.setAttribute("structPretty", Util.prettyPrint(struct.get()));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
index 88150e5..da2d5df 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -68,7 +68,7 @@ class SchedulerDriverService extends AbstractIdleService implements Driver {
@Override
protected void startUp() {
- Optional<String> frameworkId = storage.consistentRead(
+ Optional<String> frameworkId = storage.read(
new Storage.Work.Quiet<Optional<String>>() {
@Override
public Optional<String> apply(Storage.StoreProvider storeProvider) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index e38407e..934b920 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -193,7 +193,7 @@ public interface QuotaManager {
* @return {@code QuotaInfo} with quota and consumption details.
*/
private QuotaInfo getQuotaInfo(final String role, final Optional<IJobUpdate> requestedUpdate) {
- return storage.consistentRead(new Work.Quiet<QuotaInfo>() {
+ return storage.read(new Work.Quiet<QuotaInfo>() {
@Override
public QuotaInfo apply(StoreProvider storeProvider) {
FluentIterable<IScheduledTask> tasks = FluentIterable.from(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
index 149bb33..82f36d5 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -168,7 +168,7 @@ class MetricCalculator implements Runnable {
@Override
public void run() {
FluentIterable<IScheduledTask> tasks =
- FluentIterable.from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()));
+ FluentIterable.from(Storage.Util.fetchTasks(storage, Query.unscoped()));
List<IScheduledTask> prodTasks = tasks.filter(Predicates.compose(
Predicates.and(Tasks.IS_PRODUCTION, IS_SERVICE),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index f167290..6aa281c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -96,7 +96,7 @@ public class LockManagerImpl implements LockManager {
public void validateIfLocked(final ILockKey context, Optional<ILock> heldLock)
throws LockException {
- Optional<ILock> stored = storage.consistentRead(new Work.Quiet<Optional<ILock>>() {
+ Optional<ILock> stored = storage.read(new Work.Quiet<Optional<ILock>>() {
@Override
public Optional<ILock> apply(StoreProvider storeProvider) {
return storeProvider.getLockStore().fetchLock(context);
@@ -125,7 +125,7 @@ public class LockManagerImpl implements LockManager {
@Override
public Iterable<ILock> getLocks() {
- return storage.weaklyConsistentRead(new Work.Quiet<Iterable<ILock>>() {
+ return storage.read(new Work.Quiet<Iterable<ILock>>() {
@Override
public Iterable<ILock> apply(StoreProvider storeProvider) {
return storeProvider.getLockStore().fetchLocks();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index a835eaa..8e6f4e7 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -219,7 +219,7 @@ public interface MaintenanceController {
@Override
public MaintenanceMode getMode(final String host) {
- return storage.weaklyConsistentRead(new Work.Quiet<MaintenanceMode>() {
+ return storage.read(new Work.Quiet<MaintenanceMode>() {
@Override
public MaintenanceMode apply(StoreProvider storeProvider) {
return storeProvider.getAttributeStore().getHostAttributes(host)
@@ -232,7 +232,7 @@ public interface MaintenanceController {
@Override
public Set<HostStatus> getStatus(final Set<String> hosts) {
- return storage.weaklyConsistentRead(new Work.Quiet<Set<HostStatus>>() {
+ return storage.read(new Work.Quiet<Set<HostStatus>>() {
@Override
public Set<HostStatus> apply(StoreProvider storeProvider) {
// Warning - this is filtering _all_ host attributes. If using this to frequently query
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index c30a8c9..e5c0322 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -53,7 +53,7 @@ public class ResourceCounter {
private Iterable<ITaskConfig> getTasks(Query.Builder query) throws StorageException {
return Iterables.transform(
- Storage.Util.consistentFetchTasks(storage, query),
+ Storage.Util.fetchTasks(storage, query),
Tasks.SCHEDULED_TO_INFO);
}
@@ -91,7 +91,7 @@ public class ResourceCounter {
* @throws StorageException if there was a problem fetching quotas from storage.
*/
public Metric computeQuotaAllocationTotals() throws StorageException {
- return storage.weaklyConsistentRead(new Work.Quiet<Metric>() {
+ return storage.read(new Work.Quiet<Metric>() {
@Override
public Metric apply(StoreProvider storeProvider) {
Metric allocation = new Metric();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 0d02207..07d81e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -110,17 +110,9 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
}
@Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+ public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
checkInState(State.READY);
- return wrapped.consistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
-
- checkInState(State.READY);
- return wrapped.weaklyConsistentRead(work);
+ return wrapped.read(work);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java b/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
deleted file mode 100644
index 4e6d68b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.base.Preconditions;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A lock manager that wraps a ReadWriteLock and detects ill-fated attempts to upgrade
- * a read-locked thread to a write-locked thread, which would otherwise deadlock.
- */
-public class ReadWriteLockManager {
- private final ReentrantReadWriteLock managedLock = new ReentrantReadWriteLock();
-
- private enum LockMode {
- NONE,
- READ,
- WRITE
- }
-
- public enum LockType {
- READ(LockMode.READ),
- WRITE(LockMode.WRITE);
-
- private LockMode mode;
-
- private LockType(LockMode mode) {
- this.mode = mode;
- }
-
- LockMode getMode() {
- return mode;
- }
- }
-
- private static class LockState {
- private LockMode initialLockMode = LockMode.NONE;
- private int lockCount = 0;
-
- private boolean lockAcquired(LockMode mode) {
- boolean stateChanged = false;
- if (initialLockMode == LockMode.NONE) {
- initialLockMode = mode;
- stateChanged = true;
- }
- if (initialLockMode.equals(mode)) {
- lockCount++;
- }
- return stateChanged;
- }
-
- private void lockReleased(LockMode mode) {
- if (initialLockMode.equals(mode)) {
- lockCount--;
- if (lockCount == 0) {
- initialLockMode = LockMode.NONE;
- }
- }
- }
- }
-
- private final ThreadLocal<LockState> lockState = new ThreadLocal<LockState>() {
- @Override
- protected LockState initialValue() {
- return new LockState();
- }
- };
-
- /**
- * Blocks until this thread has acquired the requested lock.
- *
- * @param type Type of lock to acquire.
- * @return {@code true} if the lock was newly-acquired, or {@code false} if this thread previously
- * secured the lock and has yet to release it.
- */
- public boolean lock(LockType type) {
- requireNonNull(type);
-
- if (LockType.READ == type) {
- managedLock.readLock().lock();
- } else {
- Preconditions.checkState(lockState.get().initialLockMode != LockMode.READ,
- "A read operation may not be upgraded to a write operation.");
-
- managedLock.writeLock().lock();
- }
-
- return lockState.get().lockAcquired(type.getMode());
- }
-
- /**
- * Releases this thread's lock of the given type.
- *
- * @param type Type of lock to release.
- */
- public void unlock(LockType type) {
- requireNonNull(type);
-
- if (LockType.READ == type) {
- managedLock.readLock().unlock();
- } else {
- managedLock.writeLock().unlock();
- }
-
- lockState.get().lockReleased(type.getMode());
- }
-
- /**
- * Gets an approximation for the number of threads waiting to acquire the read or write lock.
- *
- * @see ReentrantReadWriteLock#getQueueLength()
- * @return The estimated number of threads waiting for this lock.
- */
- public int getQueueLength() {
- return managedLock.getQueueLength();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 682bca8..42233e5 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -22,7 +22,7 @@ import javax.inject.Qualifier;
import com.google.common.collect.ImmutableSet;
-import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Query.Builder;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -175,8 +175,12 @@ public interface Storage {
}
/**
- * Executes the unit of read-only {@code work}. All data in the stores may be expected to be
- * consistent, as the invocation is mutually exclusive of any writes.
+ * Executes the unit of read-only {@code work}. The consistency model creates the possibility
+ * for a reader to read uncommitted state from a concurrent writer.
+ * <p>
+ * TODO(wfarner): Update this documentation once all stores are backed by
+ * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then
+ * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used.
*
* @param work The unit of work to execute.
* @param <T> The type of result this unit of work produces.
@@ -185,21 +189,7 @@ public interface Storage {
* @throws StorageException if there was a problem reading from stable storage.
* @throws E bubbled transparently when the unit of work throws
*/
- <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E;
-
- /**
- * Executes a unit of read-only {@code work}. This is functionally identical to
- * {@link #consistentRead(Work)} with the exception that data in the stores may not be fully
- * consistent.
- *
- * @param work The unit of work to execute.
- * @param <T> The type of result this unit of work produces.
- * @param <E> The type of exception this unit of work can throw.
- * @return the result when the unit of work completes successfully
- * @throws StorageException if there was a problem reading from stable storage.
- * @throws E bubbled transparently when the unit of work throws
- */
- <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work) throws StorageException, E;
+ <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E;
/**
* Executes the unit of mutating {@code work}.
@@ -234,7 +224,7 @@ public interface Storage {
*
* @param initializationLogic work to perform after this storage system is ready but before
* allowing general use of
- * {@link #consistentRead}.
+ * {@link #read}.
* @throws StorageException if there was a starting storage.
*/
void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
@@ -274,37 +264,13 @@ public interface Storage {
* Fetch tasks matching the query returned by {@code query} from {@code storage} in a
* read operation.
*
- * @see #consistentFetchTasks
+ * @see #fetchTasks
* @param storage Storage instance to query from.
* @param query Builder of the query to perform.
* @return Tasks returned from the query.
*/
- public static ImmutableSet<IScheduledTask> consistentFetchTasks(
- Storage storage,
- final Query.Builder query) {
-
- return storage.consistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
- @Override
- public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
- return storeProvider.getTaskStore().fetchTasks(query);
- }
- });
- }
-
- /**
- * Identical to {@link #consistentFetchTasks(Storage, Query.Builder)}, but fetches tasks using a
- * weakly-consistent read operation.
- *
- * @see #consistentFetchTasks
- * @param storage Storage instance to query from.
- * @param query Builder of the query to perform.
- * @return Tasks returned from the query.
- */
- public static ImmutableSet<IScheduledTask> weaklyConsistentFetchTasks(
- Storage storage,
- final Query.Builder query) {
-
- return storage.weaklyConsistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+ public static ImmutableSet<IScheduledTask> fetchTasks(Storage storage, final Builder query) {
+ return storage.read(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
@Override
public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
return storeProvider.getTaskStore().fetchTasks(query);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 503bdfe..2102adb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -88,7 +88,7 @@ interface TemporaryStorage {
@Override
public Set<IScheduledTask> fetchTasks(final Query.Builder query) {
- return storage.consistentRead(new Work.Quiet<Set<IScheduledTask>>() {
+ return storage.read(new Work.Quiet<Set<IScheduledTask>>() {
@Override
public Set<IScheduledTask> apply(StoreProvider storeProvider) {
return storeProvider.getTaskStore().fetchTasks(query);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 40487e5..ea4afb4 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -46,14 +46,13 @@ import static java.util.Objects.requireNonNull;
/**
* A storage implementation backed by a relational database.
- *
- * <p>Delegates read and write concurrency semantics to the underlying database.
- * In this implementation, {@link #weaklyConsistentRead(Work)} and {@link #consistentRead(Work)}
- * have identical behaviour as they are both annotated by
- * {@link org.mybatis.guice.transactional.Transactional}. This class is currently only
- * partially implemented, with the underlying {@link MutableStoreProvider} only providing
- * a {@link LockStore.Mutable} implementation. It is designed to be a long term replacement
- * for {@link org.apache.aurora.scheduler.storage.mem.MemStorage}.</p>
+ * <p>
+ * Delegates read and write concurrency semantics to the underlying database.
+ * This class is currently only partially implemented, with the underlying
+ * {@link MutableStoreProvider} only providing some, but not all, store implementations. It is
+ * designed to be a long term replacement for
+ * {@link org.apache.aurora.scheduler.storage.mem.MemStorage}.
+ * </p>
*/
class DbStorage extends AbstractIdleService implements Storage {
@@ -123,19 +122,7 @@ class DbStorage extends AbstractIdleService implements Storage {
@Override
@Transactional
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- try {
- return work.apply(storeProvider);
- } catch (PersistenceException e) {
- throw new StorageException(e.getMessage(), e);
- }
- }
-
- @Override
- @Transactional
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
-
+ public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
try {
return work.apply(storeProvider);
} catch (PersistenceException e) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 45ea50f..ba1672f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,6 +37,7 @@ import com.twitter.common.base.Closure;
import com.twitter.common.inject.TimedInterceptor.Timed;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.SlidingStats;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
@@ -185,6 +187,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
private final QuotaStore.Mutable writeBehindQuotaStore;
private final AttributeStore.Mutable writeBehindAttributeStore;
private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
+ private final ReentrantLock writeLock;
private StreamManager streamManager;
private final WriteAheadStorage writeAheadStorage;
@@ -196,22 +199,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
private boolean recovered = false;
private StreamTransaction transaction = null;
- /**
- * Identifies the grace period to give in-process snapshots and checkpoints to complete during
- * shutdown.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @Qualifier
- public @interface ShutdownGracePeriod { }
-
- /**
- * Identifies the interval between snapshots of local storage truncating the log.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @Qualifier
- public @interface SnapshotInterval { }
+ private final SlidingStats writerWaitStats =
+ new SlidingStats("log_storage_write_lock_wait", "ns");
/**
* Identifies a local storage layer that is written to only after first ensuring the write
@@ -229,9 +218,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
LogStorage(
LogManager logManager,
ShutdownRegistry shutdownRegistry,
- @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
+ Settings settings,
SnapshotStore<Snapshot> snapshotStore,
- @SnapshotInterval Amount<Long, Time> snapshotInterval,
@WriteBehind Storage storage,
@WriteBehind SchedulerStore.Mutable schedulerStore,
@WriteBehind JobStore.Mutable jobStore,
@@ -240,12 +228,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
@WriteBehind QuotaStore.Mutable quotaStore,
@WriteBehind AttributeStore.Mutable attributeStore,
@WriteBehind JobUpdateStore.Mutable jobUpdateStore,
- EventSink eventSink) {
+ EventSink eventSink,
+ ReentrantLock writeLock) {
this(logManager,
- new ScheduledExecutorSchedulingService(shutdownRegistry, shutdownGracePeriod),
+ new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()),
snapshotStore,
- snapshotInterval,
+ settings.getSnapshotInterval(),
storage,
schedulerStore,
jobStore,
@@ -254,7 +243,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
quotaStore,
attributeStore,
jobUpdateStore,
- eventSink);
+ eventSink,
+ writeLock);
}
@VisibleForTesting
@@ -271,7 +261,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
QuotaStore.Mutable quotaStore,
AttributeStore.Mutable attributeStore,
JobUpdateStore.Mutable jobUpdateStore,
- EventSink eventSink) {
+ EventSink eventSink,
+ ReentrantLock writeLock) {
this.logManager = requireNonNull(logManager);
this.schedulingService = requireNonNull(schedulingService);
@@ -290,6 +281,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
this.writeBehindQuotaStore = requireNonNull(quotaStore);
this.writeBehindAttributeStore = requireNonNull(attributeStore);
this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
+ this.writeLock = requireNonNull(writeLock);
TransactionManager transactionManager = new TransactionManager() {
@Override
public boolean hasActiveTransaction() {
@@ -319,7 +311,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
@VisibleForTesting
final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() {
-
return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
.put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() {
@Override
@@ -347,12 +338,12 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
public void execute(LogEntry item) throws RuntimeException {
// Nothing to do here
}
- }).build();
+ })
+ .build();
}
@VisibleForTesting
final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() {
-
return ImmutableMap.<Op._Fields, Closure<Op>>builder()
.put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() {
@Override
@@ -481,9 +472,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
} catch (IOException e) {
throw new IllegalStateException("Failed to open the log, cannot continue", e);
}
-
- // TODO(John Sirois): start incremental recovery here from the log and do a final recovery
- // catchup in start after shutting down the incremental syncer.
}
@Override
@@ -586,11 +574,11 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
Snapshot snapshot = snapshotStore.createSnapshot();
persist(snapshot);
LOG.info("Snapshot complete."
- + " host attrs: " + snapshot.getHostAttributesSize()
- + ", jobs: " + snapshot.getJobsSize()
- + ", locks: " + snapshot.getLocksSize()
- + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
- + ", tasks: " + snapshot.getTasksSize());
+ + " host attrs: " + snapshot.getHostAttributesSize()
+ + ", jobs: " + snapshot.getJobsSize()
+ + ", locks: " + snapshot.getLocksSize()
+ + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+ + ", tasks: " + snapshot.getTasksSize());
}
});
}
@@ -603,16 +591,9 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
streamManager.snapshot(snapshot);
}
- @Override
- public synchronized <T, E extends Exception> T write(final MutateWork<T, E> work)
+ private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
throws StorageException, E {
- // We don't want to use the log when recovering from it, we just want to update the underlying
- // store - so pass mutations straight through to the underlying storage.
- if (!recovered) {
- return writeBehindStorage.write(work);
- }
-
// The log stream transaction has already been set up so we just need to delegate with our
// store provider so any mutations performed by work get logged.
if (transaction != null) {
@@ -643,15 +624,26 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
}
@Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return writeBehindStorage.consistentRead(work);
+ public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+ long waitStart = System.nanoTime();
+ writeLock.lock();
+ try {
+ writerWaitStats.accumulate(System.nanoTime() - waitStart);
+ // We don't want to use the log when recovering from it, we just want to update the underlying
+ // store - so pass mutations straight through to the underlying storage.
+ if (!recovered) {
+ return writeBehindStorage.write(work);
+ }
+
+ return doInTransaction(work);
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
-
- return writeBehindStorage.weaklyConsistentRead(work);
+ public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+ return writeBehindStorage.read(work);
}
@Override
@@ -666,4 +658,25 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
throw new StorageException("Failed to create a snapshot", e);
}
}
+
+ /**
+ * Configuration settings for log storage.
+ */
+ public static class Settings {
+ private final Amount<Long, Time> shutdownGracePeriod;
+ private final Amount<Long, Time> snapshotInterval;
+
+ public Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
+ this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
+ this.snapshotInterval = requireNonNull(snapshotInterval);
+ }
+
+ public Amount<Long, Time> getShutdownGracePeriod() {
+ return shutdownGracePeriod;
+ }
+
+ public Amount<Long, Time> getSnapshotInterval() {
+ return snapshotInterval;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index 73348f3..e515770 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -13,15 +13,12 @@
*/
package org.apache.aurora.scheduler.storage.log;
-import java.lang.annotation.Annotation;
-
import javax.inject.Singleton;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
+import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.twitter.common.args.Arg;
@@ -32,10 +29,11 @@ import com.twitter.common.quantity.Time;
import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.log.LogManager.DeduplicateSnapshots;
import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
-import org.apache.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
+import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
import static org.apache.aurora.scheduler.storage.log.LogManager.DeflateSnapshots;
@@ -45,7 +43,7 @@ import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.Snaps
/**
* Bindings for scheduler distributed log based storage.
*/
-public class LogStorageModule extends AbstractModule {
+public class LogStorageModule extends PrivateModule {
@CmdLine(name = "dlog_shutdown_grace_period",
help = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
@@ -76,8 +74,8 @@ public class LogStorageModule extends AbstractModule {
@Override
protected void configure() {
- bindInterval(ShutdownGracePeriod.class, SHUTDOWN_GRACE_PERIOD);
- bindInterval(SnapshotInterval.class, SNAPSHOT_INTERVAL);
+ bind(Settings.class)
+ .toInstance(new Settings(SHUTDOWN_GRACE_PERIOD.get(), SNAPSHOT_INTERVAL.get()));
bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
.toInstance(MAX_LOG_ENTRY_SIZE.get());
@@ -88,6 +86,9 @@ public class LogStorageModule extends AbstractModule {
install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
bind(DistributedSnapshotStore.class).to(LogStorage.class);
+ expose(Storage.class);
+ expose(NonVolatileStorage.class);
+ expose(DistributedSnapshotStore.class);
bind(EntrySerializer.class).to(EntrySerializerImpl.class);
// TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
@@ -100,8 +101,4 @@ public class LogStorageModule extends AbstractModule {
.implement(StreamManager.class, StreamManagerImpl.class)
.build(StreamManagerFactory.class));
}
-
- private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {
- bind(Key.get(new TypeLiteral<Amount<Long, Time>>() { }, key)).toInstance(value.get());
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 66ff567..ea33037 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -256,7 +256,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
@Timed("snapshot_create")
@Override
public Snapshot createSnapshot() {
- return storage.consistentRead(new Work.Quiet<Snapshot>() {
+ return storage.read(new Work.Quiet<Snapshot>() {
@Override
public Snapshot apply(StoreProvider storeProvider) {
Snapshot snapshot = new Snapshot();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
index 2cc76dc..bc3a7c5 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -17,30 +17,22 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
-import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.twitter.common.inject.Bindings;
import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.JobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.ReadWriteLockManager;
-import org.apache.aurora.scheduler.storage.ReadWriteLockManager.LockType;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.TaskStore;
@@ -50,27 +42,9 @@ import static java.util.Objects.requireNonNull;
/**
* A storage implementation comprised of individual in-memory store implementations.
- * <p>
- * This storage has a global read-write lock, which is used when invoking
- * {@link #consistentRead(Work)} and {@link #write(MutateWork)}. However, no locks are used at this
- * level for {@link #weaklyConsistentRead(Work)}. It is the responsibility of the
- * individual stores to ensure that read operations are thread-safe (optimally supporting
- * concurrency). Store implementations may assume that all methods invoked on {@code Mutable}
- * store interfaces are protected by the global write lock, and thus invoked serially.
*/
public class MemStorage implements Storage {
- private final AtomicLong readLockWaitNanos = Stats.exportLong("read_lock_wait_nanos");
- private final AtomicLong writeLockWaitNanos = Stats.exportLong("write_lock_wait_nanos");
-
- // We choose to not use the @Timed decorator for these stats since nested transactions are normal
- // and pollute the stats.
- private final SlidingStats readStats =
- new SlidingStats("mem_storage_consistent_read_operation", "nanos");
- private final SlidingStats writeStats =
- new SlidingStats("mem_storage_write_operation", "nanos");
-
private final MutableStoreProvider storeProvider;
- private final ReadWriteLockManager lockManager = new ReadWriteLockManager();
private final Storage delegatedStore;
/**
@@ -93,8 +67,7 @@ public class MemStorage implements Storage {
@Delegated final Storage delegated,
@Delegated final QuotaStore.Mutable quotaStore,
@Delegated final AttributeStore.Mutable attributeStore,
- @Delegated final JobUpdateStore.Mutable updateStore,
- StatsProvider statsProvider) {
+ @Delegated final JobUpdateStore.Mutable updateStore) {
this.delegatedStore = requireNonNull(delegated);
storeProvider = new MutableStoreProvider() {
@@ -138,91 +111,40 @@ public class MemStorage implements Storage {
return updateStore;
}
};
-
- statsProvider.makeGauge(
- THREADS_WAITING_GAUGE,
- new Supplier<Integer>() {
- @Override
- public Integer get() {
- return lockManager.getQueueLength();
- }
- }
- );
}
/**
- * Creates a new empty in-memory storage for use in testing, exporting gauges to the provided
- * stats provider. NOTE: Due to the fact that some libraries statically access {@link Stats},
- * not all guaranteed to use the stats provider.
+ * Creates a new empty in-memory storage for use in testing.
*/
@VisibleForTesting
- public static Storage newEmptyStorage(final StatsProvider statsProvider) {
+ public static Storage newEmptyStorage() {
Injector injector = Guice.createInjector(
DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
- new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(StatsProvider.class).toInstance(statsProvider);
- }
- });
-
+ new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)));
Storage storage = injector.getInstance(Key.get(Storage.class, Volatile.class));
storage.prepare();
return storage;
}
- /**
- * Creates a new empty in-memory storage for use in testing.
- */
- @VisibleForTesting
- public static Storage newEmptyStorage() {
- return newEmptyStorage(Stats.STATS_PROVIDER);
- }
-
- private <S extends StoreProvider, T, E extends Exception> T doWork(
- LockType lockType,
- S stores,
- StorageOperation<S, T, E> work,
- SlidingStats stats,
- AtomicLong lockWaitStat) throws StorageException, E {
-
- requireNonNull(work);
-
- // Perform the work, and only record stats for top-level transactions. This prevents
- // over-counting when nested transactions are performed.
- long lockStartNanos = System.nanoTime();
- boolean topLevelOperation = lockManager.lock(lockType);
- if (topLevelOperation) {
- lockWaitStat.addAndGet(System.nanoTime() - lockStartNanos);
- }
- try {
- return work.apply(stores);
- } finally {
- lockManager.unlock(lockType);
- if (topLevelOperation) {
- stats.accumulate(System.nanoTime() - lockStartNanos);
- }
- }
- }
-
+ @Timed("mem_storage_read_operation")
@Override
- public <T, E extends Exception> T consistentRead(final Work<T, E> work)
+ public <T, E extends Exception> T read(final Work<T, E> work)
throws StorageException, E {
- return delegatedStore.consistentRead(new Work<T, E>() {
+ return delegatedStore.read(new Work<T, E>() {
@Override
public T apply(StoreProvider provider) throws E {
- return doWork(LockType.READ, storeProvider, work, readStats, readLockWaitNanos);
+ return work.apply(storeProvider);
}
});
}
+ @Timed("mem_storage_write_operation")
@Override
public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
return delegatedStore.write(new MutateWork<T, E>() {
@Override
public T apply(MutableStoreProvider provider) throws E {
- return doWork(LockType.WRITE, storeProvider, work, writeStats, writeLockWaitNanos);
+ return work.apply(storeProvider);
}
});
}
@@ -231,18 +153,4 @@ public class MemStorage implements Storage {
public void prepare() throws StorageException {
delegatedStore.prepare();
}
-
- @Timed("mem_storage_weakly_consistent_read_operation")
- @Override
- public <T, E extends Exception> T weaklyConsistentRead(final Work<T, E> work)
- throws StorageException, E {
-
- return delegatedStore.weaklyConsistentRead(new Work<T, E>() {
- @Override
- public T apply(StoreProvider provider) throws E {
- return work.apply(storeProvider);
- }
- });
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index b66b916..a5e869f 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -497,9 +497,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
private List<ScheduledTask> getTasks(TaskQuery query) {
requireNonNull(query);
- Iterable<IScheduledTask> tasks =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query));
-
+ Iterable<IScheduledTask> tasks = Storage.Util.fetchTasks(storage, Query.arbitrary(query));
if (query.isSetOffset()) {
tasks = Iterables.skip(tasks, query.getOffset());
}
@@ -558,7 +556,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
IJobKey jobKey = JobKeys.assertValid(IJobKey.build(job));
Set<IScheduledTask> activeTasks =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active());
+ Storage.Util.fetchTasks(storage, Query.jobScoped(jobKey).active());
Iterable<IAssignedTask> assignedTasks =
Iterables.transform(activeTasks, Tasks.SCHEDULED_TO_ASSIGNED);
@@ -578,7 +576,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
@Override
public Response getRoleSummary() {
Multimap<String, IJobKey> jobsByRole = mapByRole(
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()),
+ Storage.Util.fetchTasks(storage, Query.unscoped()),
Tasks.SCHEDULED_TO_JOB_KEY);
Multimap<String, IJobKey> cronJobsByRole = mapByRole(
@@ -674,7 +672,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
}
private Multimap<IJobKey, IScheduledTask> getTasks(Query.Builder query) {
- return Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, query));
+ return Tasks.byJobKey(Storage.Util.fetchTasks(storage, query));
}
private static <T> Multimap<String, IJobKey> mapByRole(
@@ -1538,7 +1536,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
final IJobUpdateQuery query = IJobUpdateQuery.build(requireNonNull(mutableQuery));
return okResponse(Result.getJobUpdateSummariesResult(
new GetJobUpdateSummariesResult().setUpdateSummaries(IJobUpdateSummary.toBuildersList(
- storage.weaklyConsistentRead(new Work.Quiet<List<IJobUpdateSummary>>() {
+ storage.read(new Work.Quiet<List<IJobUpdateSummary>>() {
@Override
public List<IJobUpdateSummary> apply(StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query);
@@ -1550,7 +1548,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
public Response getJobUpdateDetails(final String updateId) {
requireNonNull(updateId);
Optional<IJobUpdateDetails> details =
- storage.weaklyConsistentRead(new Work.Quiet<Optional<IJobUpdateDetails>>() {
+ storage.read(new Work.Quiet<Optional<IJobUpdateDetails>>() {
@Override
public Optional<IJobUpdateDetails> apply(StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(updateId);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 5cc3b41..3336f8c 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -56,14 +56,7 @@ class FakeNonVolatileStorage implements NonVolatileStorage {
}
@Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
-
- return delegate.weaklyConsistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return delegate.consistentRead(work);
+ public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+ return delegate.read(work);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
index d56c36c..934e9bb 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -211,7 +211,7 @@ public class CronJobManagerImplTest extends EasyMockTest {
}
private Optional<IJobConfiguration> fetchFromStorage() {
- return storage.consistentRead(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
+ return storage.read(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
@Override
public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index b60dd9c..265c38d 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -43,14 +43,9 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -103,35 +98,15 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private final AtomicLong taskIdCounter = new AtomicLong();
private SchedulingFilter defaultFilter;
- private Storage storage;
- private StoreProvider storeProvider;
private AttributeStore.Mutable attributeStore;
@Before
public void setUp() {
- storage = createMock(Storage.class);
defaultFilter = new SchedulingFilterImpl();
- storeProvider = createMock(StoreProvider.class);
attributeStore = createMock(AttributeStore.Mutable.class);
emptyJob = new AttributeAggregate(
Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
attributeStore);
-
- // Link the store provider to the store mocks.
- expectReads();
-
- expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
- }
-
- private void expectReads() {
- expect(storage.weaklyConsistentRead(EasyMock.<Quiet<Object>>anyObject()))
- .andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() {
- Quiet<?> arg = (Quiet<?>) EasyMock.getCurrentArguments()[0];
- return arg.apply(storeProvider);
- }
- }).anyTimes();
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index 953fb9e..2f14205 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -173,7 +173,7 @@ public class LockManagerImplTest extends EasyMockTest {
final CountDownLatch reads = new CountDownLatch(2);
EasyMock.makeThreadSafe(storageUtil.storage, false);
- expect(storageUtil.storage.consistentRead(EasyMock.<Work<Object, ?>>anyObject()))
+ expect(storageUtil.storage.read(EasyMock.<Work<Object, ?>>anyObject()))
.andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 157921c..03b1c9b 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -183,7 +183,7 @@ public class StateManagerImplTest extends EasyMockTest {
.setTaskId(taskId)
.setTask(task.newBuilder()));
assertEquals(ImmutableSet.of(IScheduledTask.build(expected)),
- Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+ Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
}
@Test
@@ -332,7 +332,7 @@ public class StateManagerImplTest extends EasyMockTest {
changeState(taskId, RUNNING);
changeState(taskId, FAILED);
IScheduledTask rescheduledTask = Iterables.getOnlyElement(
- Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+ Storage.Util.fetchTasks(storage, Query.taskScoped(taskId2)));
assertEquals(taskId, rescheduledTask.getAncestorId());
assertEquals(1, rescheduledTask.getFailureCount());
}
@@ -410,7 +410,7 @@ public class StateManagerImplTest extends EasyMockTest {
assignTask(taskId, HOST_A, ImmutableSet.of(80, 81, 82));
IScheduledTask actual = Iterables.getOnlyElement(
- Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+ Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
assertEquals(
requestedPorts,
@@ -442,7 +442,7 @@ public class StateManagerImplTest extends EasyMockTest {
assignTask(newTaskId, HOST_A, ImmutableSet.of(86));
IScheduledTask actual = Iterables.getOnlyElement(
- Storage.Util.consistentFetchTasks(storage, Query.taskScoped(newTaskId)));
+ Storage.Util.fetchTasks(storage, Query.taskScoped(newTaskId)));
assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts());
}
@@ -471,7 +471,7 @@ public class StateManagerImplTest extends EasyMockTest {
control.replay();
insertTask(task, 0);
- Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+ Iterables.getOnlyElement(Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
insertTask(task, 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java
deleted file mode 100644
index b63ff3e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/ReadWriteLockManagerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.testing.TearDown;
-import com.google.common.testing.junit4.TearDownTestCase;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.storage.ReadWriteLockManager.LockType.READ;
-import static org.apache.aurora.scheduler.storage.ReadWriteLockManager.LockType.WRITE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class ReadWriteLockManagerTest extends TearDownTestCase {
-
- private ReadWriteLockManager lockManager;
- private ExecutorService executor;
-
- @Before
- public void setUp() {
- lockManager = new ReadWriteLockManager();
- executor = Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("LockManagerTest-%d").setDaemon(true).build());
- addTearDown(new TearDown() {
- @Override
- public void tearDown() {
- new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
- }
- });
- }
-
- @Test
- public void testModeDowngrade() {
- lockManager.lock(WRITE);
- lockManager.lock(READ);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testModeUpgrade() {
- lockManager.lock(READ);
- lockManager.lock(WRITE);
- }
-
- @Test
- public void testSimultaneousReads() throws Exception {
- final CountDownLatch slowReadStarted = new CountDownLatch(1);
- final CountDownLatch fastReadFinished = new CountDownLatch(1);
-
- Future<String> slowReadResult = executor.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- lockManager.lock(READ);
- slowReadStarted.countDown();
- fastReadFinished.await();
- lockManager.unlock(READ);
- return "slow";
- }
- });
-
- slowReadStarted.await();
- lockManager.lock(READ);
- lockManager.unlock(READ);
- fastReadFinished.countDown();
- assertEquals("slow", slowReadResult.get());
- }
-
- @Test
- public void testReentrantReadLock() {
- assertTrue(lockManager.lock(READ));
- assertFalse(lockManager.lock(READ));
- lockManager.unlock(READ);
- lockManager.unlock(READ);
- assertTrue(lockManager.lock(READ));
- }
-
- @Test
- public void testReentrantWriteLock() {
- assertTrue(lockManager.lock(WRITE));
- assertFalse(lockManager.lock(WRITE));
- lockManager.unlock(WRITE);
- lockManager.unlock(WRITE);
- assertTrue(lockManager.lock(WRITE));
- assertFalse(lockManager.lock(READ));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
index 2514d84..64c1b7f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -185,7 +185,7 @@ public class StorageBackfillTest {
backfill();
IJobConfiguration actual = Iterables.getOnlyElement(
- storage.consistentRead(new Storage.Work.Quiet<Iterable<IJobConfiguration>>() {
+ storage.read(new Storage.Work.Quiet<Iterable<IJobConfiguration>>() {
@Override
public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobStore().fetchJobs("CRON");
@@ -235,7 +235,7 @@ public class StorageBackfillTest {
ImmutableSet.of(
IScheduledTask.build(noJobKeyBackfilled),
IScheduledTask.build(nullJobKeyFieldsBackfilled)),
- Storage.Util.consistentFetchTasks(storage, Query.unscoped()));
+ Storage.Util.fetchTasks(storage, Query.unscoped()));
}
private void backfill() {
@@ -280,12 +280,12 @@ public class StorageBackfillTest {
}
private IScheduledTask getTask(String taskId) {
- return Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(
+ return Iterables.getOnlyElement(Storage.Util.fetchTasks(
storage,
Query.taskScoped(taskId)));
}
private Set<IScheduledTask> getTasksByStatus(ScheduleStatus status) {
- return Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(status));
+ return Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(status));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index c0ccfaa..ca7c0c2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -658,7 +658,7 @@ public class DBJobUpdateStoreTest {
}
private Optional<IJobUpdate> getUpdate(final String updateId) {
- return storage.consistentRead(new Quiet<Optional<IJobUpdate>>() {
+ return storage.read(new Quiet<Optional<IJobUpdate>>() {
@Override
public Optional<IJobUpdate> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdate(updateId);
@@ -667,7 +667,7 @@ public class DBJobUpdateStoreTest {
}
private List<IJobInstanceUpdateEvent> getInstanceEvents(final String updateId, final int id) {
- return storage.consistentRead(new Quiet<List<IJobInstanceUpdateEvent>>() {
+ return storage.read(new Quiet<List<IJobInstanceUpdateEvent>>() {
@Override
public List<IJobInstanceUpdateEvent> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchInstanceEvents(updateId, id);
@@ -676,7 +676,7 @@ public class DBJobUpdateStoreTest {
}
private Optional<IJobUpdateInstructions> getUpdateInstructions(final String updateId) {
- return storage.consistentRead(new Quiet<Optional<IJobUpdateInstructions>>() {
+ return storage.read(new Quiet<Optional<IJobUpdateInstructions>>() {
@Override
public Optional<IJobUpdateInstructions> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdateInstructions(updateId);
@@ -685,7 +685,7 @@ public class DBJobUpdateStoreTest {
}
private Optional<IJobUpdateDetails> getUpdateDetails(final String updateId) {
- return storage.consistentRead(new Quiet<Optional<IJobUpdateDetails>>() {
+ return storage.read(new Quiet<Optional<IJobUpdateDetails>>() {
@Override
public Optional<IJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(updateId);
@@ -694,7 +694,7 @@ public class DBJobUpdateStoreTest {
}
private Set<StoredJobUpdateDetails> getAllUpdateDetails() {
- return storage.consistentRead(new Quiet<Set<StoredJobUpdateDetails>>() {
+ return storage.read(new Quiet<Set<StoredJobUpdateDetails>>() {
@Override
public Set<StoredJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchAllJobUpdateDetails();
@@ -703,7 +703,7 @@ public class DBJobUpdateStoreTest {
}
private List<IJobUpdateSummary> getSummaries(final JobUpdateQuery query) {
- return storage.consistentRead(new Quiet<List<IJobUpdateSummary>>() {
+ return storage.read(new Quiet<List<IJobUpdateSummary>>() {
@Override
public List<IJobUpdateSummary> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
index d659658..db508ca 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbAttributeStoreTest.java
@@ -140,7 +140,7 @@ public class DbAttributeStoreTest {
}
private Optional<IHostAttributes> read(final String host) {
- return storage.consistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
+ return storage.read(new Work.Quiet<Optional<IHostAttributes>>() {
@Override
public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
return storeProvider.getAttributeStore().getHostAttributes(host);
@@ -149,7 +149,7 @@ public class DbAttributeStoreTest {
}
private Set<IHostAttributes> readAll() {
- return storage.consistentRead(new Work.Quiet<Set<IHostAttributes>>() {
+ return storage.read(new Work.Quiet<Set<IHostAttributes>>() {
@Override
public Set<IHostAttributes> apply(StoreProvider storeProvider) {
return storeProvider.getAttributeStore().getHostAttributes();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5116c220/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
index e9b210f..d6140f5 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbLockStoreTest.java
@@ -44,7 +44,7 @@ public class DbLockStoreTest {
private void assertLocks(final ILock... expected) {
assertEquals(
ImmutableSet.<ILock>builder().add(expected).build(),
- storage.consistentRead(new Quiet<Set<ILock>>() {
+ storage.read(new Quiet<Set<ILock>>() {
@Override
public Set<ILock> apply(Storage.StoreProvider storeProvider) {
return storeProvider.getLockStore().fetchLocks();
@@ -53,7 +53,7 @@ public class DbLockStoreTest {
}
private Optional<ILock> getLock(final ILockKey key) {
- return storage.consistentRead(new Quiet<Optional<ILock>>() {
+ return storage.read(new Quiet<Optional<ILock>>() {
@Override
public Optional<ILock> apply(StoreProvider storeProvider) {
return storeProvider.getLockStore().fetchLock(key);