You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:10 UTC
[1/4] aurora git commit: Extract a storage Persistence layer
Repository: aurora
Updated Branches:
refs/heads/master de8b37549 -> cea43db9d
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
deleted file mode 100644
index 3c056c9..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateAction;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateSettings;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.Range;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveJobUpdates;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.Transaction;
-import org.apache.aurora.gen.storage.storageConstants;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.log.Log;
-import org.apache.aurora.scheduler.log.Log.Entry;
-import org.apache.aurora.scheduler.log.Log.Position;
-import org.apache.aurora.scheduler.log.Log.Stream;
-import org.apache.aurora.scheduler.resources.ResourceTestUtil;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.log.LogStorage.SchedulingService;
-import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
-import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
-import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.Resource.diskMb;
-import static org.apache.aurora.gen.Resource.numCpus;
-import static org.apache.aurora.gen.Resource.ramMb;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.notNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LogStorageTest extends EasyMockTest {
-
- private static final Amount<Long, Time> SNAPSHOT_INTERVAL = Amount.of(1L, Time.MINUTES);
- private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
- private static final IJobUpdateKey UPDATE_ID =
- IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId"));
- private static final long NOW = 42;
-
- private LogStorage logStorage;
- private Log log;
- private SnapshotDeduplicator deduplicator;
- private Stream stream;
- private Position position;
- private StreamMatcher streamMatcher;
- private SchedulingService schedulingService;
- private SnapshotStore<Snapshot> snapshotStore;
- private StorageTestUtil storageUtil;
- private EventSink eventSink;
-
- @Before
- public void setUp() {
- log = createMock(Log.class);
- deduplicator = createMock(SnapshotDeduplicator.class);
-
- StreamManagerFactory streamManagerFactory = logStream -> {
- HashFunction md5 = Hashing.md5();
- return new StreamManagerImpl(
- logStream,
- new EntrySerializer.EntrySerializerImpl(Amount.of(1, Data.GB), md5),
- md5,
- deduplicator);
- };
- LogManager logManager = new LogManager(log, streamManagerFactory);
-
- schedulingService = createMock(SchedulingService.class);
- snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
- storageUtil = new StorageTestUtil(this);
- eventSink = createMock(EventSink.class);
-
- logStorage = new LogStorage(
- logManager,
- schedulingService,
- snapshotStore,
- SNAPSHOT_INTERVAL,
- storageUtil.storage,
- storageUtil.schedulerStore,
- storageUtil.jobStore,
- storageUtil.taskStore,
- storageUtil.quotaStore,
- storageUtil.attributeStore,
- storageUtil.jobUpdateStore,
- eventSink,
- new ReentrantLock(),
- TaskTestUtil.THRIFT_BACKFILL);
-
- stream = createMock(Stream.class);
- streamMatcher = LogOpMatcher.matcherFor(stream);
- position = createMock(Position.class);
-
- storageUtil.storage.prepare();
- }
-
- @Test
- public void testStart() throws Exception {
- // We should open the log and arrange for its clean shutdown.
- expect(log.open()).andReturn(stream);
-
- // Our start should recover the log and then forward to the underlying storage start of the
- // supplied initialization logic.
- AtomicBoolean initialized = new AtomicBoolean(false);
- MutateWork.NoResult.Quiet initializationLogic = provider -> {
- // Creating a mock and expecting apply(storeProvider) does not work here for whatever
- // reason.
- initialized.set(true);
- };
-
- Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
- storageUtil.storage.write(capture(recoverAndInitializeWork));
- expectLastCall().andAnswer(() -> {
- recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
- return null;
- });
-
- Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
- expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
- () -> {
- recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
- return null;
- });
-
- Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
- expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
- () -> {
- initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
- return null;
- });
-
- // We should perform a snapshot when the snapshot thread runs.
- Capture<Runnable> snapshotAction = createCapture();
- schedulingService.doEvery(eq(SNAPSHOT_INTERVAL), capture(snapshotAction));
- Snapshot snapshotContents = new Snapshot()
- .setTimestamp(NOW)
- .setTasks(ImmutableSet.of(makeTask("task_id", TaskTestUtil.JOB).newBuilder()));
- expect(snapshotStore.createSnapshot()).andReturn(snapshotContents);
- DeduplicatedSnapshot deduplicated =
- new SnapshotDeduplicatorImpl().deduplicate(snapshotContents);
- expect(deduplicator.deduplicate(snapshotContents)).andReturn(deduplicated);
- streamMatcher.expectSnapshot(deduplicated).andReturn(position);
- stream.truncateBefore(position);
- Capture<MutateWork<Void, RuntimeException>> snapshotWork = createCapture();
- expect(storageUtil.storage.write(capture(snapshotWork))).andAnswer(
- () -> {
- snapshotWork.getValue().apply(storageUtil.mutableStoreProvider);
- return null;
- }).anyTimes();
-
- // Populate all LogEntry types.
- buildReplayLogEntries();
-
- control.replay();
-
- logStorage.prepare();
- logStorage.start(initializationLogic);
- assertTrue(initialized.get());
-
- // Run the snapshot thread.
- snapshotAction.getValue().run();
-
- // Assert all LogEntry types have handlers defined.
- // Our current StreamManagerImpl.readFromBeginning() does not let some entries escape
- // the decoding routine making handling them in replay unnecessary.
- assertEquals(
- Sets.complementOf(EnumSet.of(
- LogEntry._Fields.FRAME,
- LogEntry._Fields.DEDUPLICATED_SNAPSHOT,
- LogEntry._Fields.DEFLATED_ENTRY)),
- EnumSet.copyOf(logStorage.buildLogEntryReplayActions().keySet()));
-
- // Assert all Transaction types have handlers defined.
- assertEquals(
- EnumSet.allOf(Op._Fields.class),
- EnumSet.copyOf(logStorage.buildTransactionReplayActions().keySet()));
- }
-
- private void buildReplayLogEntries() throws Exception {
- ImmutableSet.Builder<LogEntry> builder = ImmutableSet.builder();
-
- builder.add(createTransaction(Op.saveFrameworkId(new SaveFrameworkId("bob"))));
- storageUtil.schedulerStore.saveFrameworkId("bob");
-
- JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
- JobConfiguration expectedJob =
- new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
- SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
- builder.add(createTransaction(Op.saveCronJob(cronJob)));
- storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
-
- RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
- builder.add(createTransaction(Op.removeJob(removeJob)));
- storageUtil.jobStore.removeJob(JOB_KEY);
-
- ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
- actualTask.getAssignedTask().setTask(nonBackfilledConfig());
- IScheduledTask expectedTask = makeTask("id", JOB_KEY);
- SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
- builder.add(createTransaction(Op.saveTasks(saveTasks)));
- storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
-
- RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
- builder.add(createTransaction(Op.removeTasks(removeTasks)));
- storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
-
- ResourceAggregate nonBackfilled = new ResourceAggregate()
- .setNumCpus(1.0)
- .setRamMb(32)
- .setDiskMb(64);
- SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
- builder.add(createTransaction(Op.saveQuota(saveQuota)));
- storageUtil.quotaStore.saveQuota(
- saveQuota.getRole(),
- IResourceAggregate.build(nonBackfilled.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
-
- builder.add(createTransaction(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))));
- storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
-
- // This entry lacks a slave ID, and should therefore be discarded.
- SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
- .setHost("host1")
- .setMode(MaintenanceMode.DRAINED));
- builder.add(createTransaction(Op.saveHostAttributes(hostAttributes1)));
-
- SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
- .setHost("host2")
- .setSlaveId("slave2")
- .setMode(MaintenanceMode.DRAINED));
- builder.add(createTransaction(Op.saveHostAttributes(hostAttributes2)));
- expect(storageUtil.attributeStore.saveHostAttributes(
- IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
-
- JobUpdate actualUpdate = new JobUpdate()
- .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
- .setInstructions(new JobUpdateInstructions()
- .setInitialState(
- ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
- .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
- JobUpdate expectedUpdate = actualUpdate.deepCopy();
- expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
- expectedUpdate.getInstructions().getInitialState()
- .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
- SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
- builder.add(createTransaction(Op.saveJobUpdate(saveUpdate)));
- storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
-
- SaveJobUpdateEvent saveUpdateEvent =
- new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
- builder.add(createTransaction(Op.saveJobUpdateEvent(saveUpdateEvent)));
- storageUtil.jobUpdateStore.saveJobUpdateEvent(
- UPDATE_ID,
- IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
-
- SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
- new JobInstanceUpdateEvent(),
- UPDATE_ID.newBuilder());
- builder.add(createTransaction(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
- storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
- UPDATE_ID,
- IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
-
- builder.add(createTransaction(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))));
- // No expectation - this op is ignored.
-
- builder.add(createTransaction(Op.removeJobUpdate(
- new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))));
- storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
-
- // NOOP LogEntry
- builder.add(LogEntry.noop(true));
-
- // Snapshot LogEntry
- Snapshot snapshot = new Snapshot();
- builder.add(LogEntry.snapshot(snapshot));
- snapshotStore.applySnapshot(snapshot);
-
- ImmutableSet.Builder<Entry> entryBuilder = ImmutableSet.builder();
- for (LogEntry logEntry : builder.build()) {
- Entry entry = createMock(Entry.class);
- entryBuilder.add(entry);
- expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
- }
-
- expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
- }
-
- private TaskConfig nonBackfilledConfig() {
- // When more fields have to be backfilled
- // modify this method.
- return makeConfig(JOB_KEY).newBuilder();
- }
-
- abstract class AbstractStorageFixture {
- private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
- AbstractStorageFixture() {
- // Prevent otherwise silent noop tests that forget to call run().
- addTearDown(new TearDown() {
- @Override
- public void tearDown() {
- assertTrue(runCalled.get());
- }
- });
- }
-
- void run() throws Exception {
- runCalled.set(true);
-
- // Expect basic start operations.
-
- // Open the log stream.
- expect(log.open()).andReturn(stream);
-
- // Replay the log and perform and supplied initializationWork.
- // Simulate NOOP initialization work
- // Creating a mock and expecting apply(storeProvider) does not work here for whatever
- // reason.
- MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
- // No-op.
- };
-
- Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
- storageUtil.storage.write(capture(recoverAndInitializeWork));
- expectLastCall().andAnswer(() -> {
- recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
- return null;
- });
-
- expect(stream.readAll()).andReturn(Collections.emptyIterator());
- Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
- expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
- () -> {
- recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
- return null;
- });
-
- // Schedule snapshots.
- schedulingService.doEvery(eq(SNAPSHOT_INTERVAL), notNull(Runnable.class));
-
- // Setup custom test expectations.
- setupExpectations();
-
- control.replay();
-
- // Start the system.
- logStorage.prepare();
- logStorage.start(initializationLogic);
-
- // Exercise the system.
- runTest();
- }
-
- protected void setupExpectations() throws Exception {
- // Default to no expectations.
- }
-
- protected abstract void runTest();
- }
-
- abstract class AbstractMutationFixture extends AbstractStorageFixture {
- @Override
- protected void runTest() {
- logStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
- }
-
- protected abstract void performMutations(MutableStoreProvider storeProvider);
- }
-
- @Test
- public void testSaveFrameworkId() throws Exception {
- String frameworkId = "bob";
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws CodingException {
- storageUtil.expectWrite();
- storageUtil.schedulerStore.saveFrameworkId(frameworkId);
- streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
- }
- }.run();
- }
-
- @Test
- public void testSaveAcceptedJob() throws Exception {
- IJobConfiguration jobConfig =
- IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder()));
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.jobStore.saveAcceptedJob(jobConfig);
- streamMatcher.expectTransaction(
- Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
- }
- }.run();
- }
-
- @Test
- public void testRemoveJob() throws Exception {
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.jobStore.removeJob(JOB_KEY);
- streamMatcher.expectTransaction(
- Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getCronJobStore().removeJob(JOB_KEY);
- }
- }.run();
- }
-
- @Test
- public void testSaveTasks() throws Exception {
- Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT));
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.taskStore.saveTasks(tasks);
- streamMatcher.expectTransaction(
- Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(tasks);
- }
- }.run();
- }
-
- @Test
- public void testMutateTasks() throws Exception {
- String taskId = "fred";
- Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
- Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
- streamMatcher.expectTransaction(
- Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
- .andReturn(null);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
- }
- }.run();
- }
-
- @Test
- public void testNestedTransactions() throws Exception {
- String taskId = "fred";
- Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
- Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
- ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-
- storageUtil.taskStore.deleteTasks(tasksToRemove);
-
- streamMatcher.expectTransaction(
- Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
- Op.removeTasks(new RemoveTasks(tasksToRemove)))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-
- logStorage.write((NoResult.Quiet)
- innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
- }
- }.run();
- }
-
- @Test
- public void testSaveAndMutateTasks() throws Exception {
- String taskId = "fred";
- Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
- Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
- Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.taskStore.saveTasks(saved);
-
- // Nested transaction with result.
- expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-
- // Resulting stream operation.
- streamMatcher.expectTransaction(Op.saveTasks(
- new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
- .andReturn(null);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(saved);
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
- }
- }.run();
- }
-
- @Test
- public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
- String taskId = "fred";
- Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
- Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
- Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.taskStore.saveTasks(saved);
-
- // Nested transaction with result.
- expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
-
- // Resulting stream operation.
- streamMatcher.expectTransaction(
- Op.saveTasks(new SaveTasks(
- ImmutableSet.<ScheduledTask>builder()
- .addAll(IScheduledTask.toBuildersList(saved))
- .add(mutated.get().newBuilder())
- .build())))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(saved);
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
- }
- }.run();
- }
-
- @Test
- public void testRemoveTasksQuery() throws Exception {
- IScheduledTask task = task("a", ScheduleStatus.FINISHED);
- Set<String> taskIds = Tasks.ids(task);
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.taskStore.deleteTasks(taskIds);
- streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
- }
- }.run();
- }
-
- @Test
- public void testRemoveTasksIds() throws Exception {
- Set<String> taskIds = ImmutableSet.of("42");
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.taskStore.deleteTasks(taskIds);
- streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
- }
- }.run();
- }
-
- @Test
- public void testSaveQuota() throws Exception {
- String role = "role";
- IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.quotaStore.saveQuota(role, quota);
- streamMatcher.expectTransaction(Op.saveQuota(new SaveQuota(role, quota.newBuilder())))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getQuotaStore().saveQuota(role, quota);
- }
- }.run();
- }
-
- @Test
- public void testRemoveQuota() throws Exception {
- String role = "role";
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.quotaStore.removeQuota(role);
- streamMatcher.expectTransaction(Op.removeQuota(new RemoveQuota(role))).andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getQuotaStore().removeQuota(role);
- }
- }.run();
- }
-
- @Test
- public void testSaveHostAttributes() throws Exception {
- String host = "hostname";
- Set<Attribute> attributes =
- ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value")));
- Optional<IHostAttributes> hostAttributes = Optional.of(
- IHostAttributes.build(new HostAttributes()
- .setHost(host)
- .setAttributes(attributes)));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- expect(storageUtil.attributeStore.getHostAttributes(host))
- .andReturn(Optional.absent());
-
- expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
-
- expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
- eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
- streamMatcher.expectTransaction(
- Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())))
- .andReturn(position);
-
- expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
- .andReturn(false);
-
- expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- AttributeStore.Mutable store = storeProvider.getAttributeStore();
- assertEquals(Optional.absent(), store.getHostAttributes(host));
-
- assertTrue(store.saveHostAttributes(hostAttributes.get()));
-
- assertEquals(hostAttributes, store.getHostAttributes(host));
-
- assertFalse(store.saveHostAttributes(hostAttributes.get()));
-
- assertEquals(hostAttributes, store.getHostAttributes(host));
- }
- }.run();
- }
-
- @Test
- public void testSaveUpdate() throws Exception {
- IJobUpdate update = IJobUpdate.build(new JobUpdate()
- .setSummary(new JobUpdateSummary()
- .setKey(UPDATE_ID.newBuilder())
- .setUser("user"))
- .setInstructions(new JobUpdateInstructions()
- .setDesiredState(new InstanceTaskConfig()
- .setTask(new TaskConfig())
- .setInstances(ImmutableSet.of(new Range(0, 3))))
- .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
- .setTask(new TaskConfig())
- .setInstances(ImmutableSet.of(new Range(0, 3)))))
- .setSettings(new JobUpdateSettings())));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.jobUpdateStore.saveJobUpdate(update);
- streamMatcher.expectTransaction(
- Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().saveJobUpdate(update);
- }
- }.run();
- }
-
- @Test
- public void testSaveJobUpdateEvent() throws Exception {
- IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
- .setStatus(JobUpdateStatus.ROLLING_BACK)
- .setTimestampMs(12345L));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
- streamMatcher.expectTransaction(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
- event.newBuilder(),
- UPDATE_ID.newBuilder()))).andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
- }
- }.run();
- }
-
- @Test
- public void testSaveJobInstanceUpdateEvent() throws Exception {
- IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent()
- .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
- .setTimestampMs(12345L)
- .setInstanceId(0));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event);
- streamMatcher.expectTransaction(Op.saveJobInstanceUpdateEvent(
- new SaveJobInstanceUpdateEvent(
- event.newBuilder(),
- UPDATE_ID.newBuilder())))
- .andReturn(position);
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
- }
- }.run();
- }
-
- @Test
- public void testRemoveJobUpdates() throws Exception {
- IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
- .setJob(JOB_KEY.newBuilder())
- .setId("update-id"));
-
- new AbstractMutationFixture() {
- @Override
- protected void setupExpectations() throws Exception {
- storageUtil.expectWrite();
- storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
-
- // No log transaction is generated since this version is currently in 'read-only'
- // compatibility mode for this operation type.
- }
-
- @Override
- protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
- }
- }.run();
- }
-
- private LogEntry createTransaction(Op... ops) {
- return LogEntry.transaction(
- new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
- }
-
- private static IScheduledTask task(String id, ScheduleStatus status) {
- return IScheduledTask.build(new ScheduledTask()
- .setStatus(status)
- .setAssignedTask(new AssignedTask()
- .setTaskId(id)
- .setTask(new TaskConfig())));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index f43a836..eb966d7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -42,6 +42,7 @@ import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
@@ -61,6 +62,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
private FakeLog log;
private Runnable teardown = () -> { };
private NonVolatileStorage storage;
+ private DistributedSnapshotStore snapshotStore;
@Before
public void setUp() {
@@ -95,6 +97,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
}
);
storage = injector.getInstance(NonVolatileStorage.class);
+ snapshotStore = injector.getInstance(DistributedSnapshotStore.class);
storage.prepare();
storage.start(w -> { });
@@ -147,7 +150,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
});
// Result should survive another reset.
- storage.snapshot();
+ snapshotStore.snapshot();
resetStorage();
storage.read(stores -> {
transaction.getSecond().accept(stores);
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index a1944c4..5634f92 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
deleted file mode 100644
index 59c2c5b..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.Resource.diskMb;
-import static org.apache.aurora.gen.Resource.namedPort;
-import static org.apache.aurora.gen.Resource.numCpus;
-import static org.apache.aurora.gen.Resource.ramMb;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class ThriftBackfillTest extends EasyMockTest {
-
- private ThriftBackfill thriftBackfill;
- private TierManager tierManager;
-
- @Before
- public void setUp() {
- tierManager = createMock(TierManager.class);
- thriftBackfill = new ThriftBackfill(tierManager);
- }
-
- @Test
- public void testFieldsToSetNoPorts() {
- TaskConfig config = new TaskConfig()
- .setResources(ImmutableSet.of(
- numCpus(1.0),
- ramMb(32),
- diskMb(64)))
- .setProduction(false)
- .setTier("tierName");
- TaskConfig expected = config.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
- expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
-
- control.replay();
-
- assertEquals(
- expected,
- thriftBackfill.backfillTask(config));
- }
-
- @Test
- public void testResourceAggregateFieldsToSet() {
- control.replay();
-
- ResourceAggregate aggregate = new ResourceAggregate()
- .setNumCpus(1.0)
- .setRamMb(32)
- .setDiskMb(64);
-
- IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
-
- assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
- }
-
- @Test
- public void testResourceAggregateSetToFields() {
- control.replay();
-
- ResourceAggregate aggregate = new ResourceAggregate()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
- IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
- .setNumCpus(1.0)
- .setRamMb(32)
- .setDiskMb(64));
-
- assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testResourceAggregateTooManyResources() {
- control.replay();
-
- ResourceAggregate aggregate = new ResourceAggregate()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0)));
- ThriftBackfill.backfillResourceAggregate(aggregate);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testResourceAggregateInvalidResources() {
- control.replay();
-
- ResourceAggregate aggregate = new ResourceAggregate()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http")));
- ThriftBackfill.backfillResourceAggregate(aggregate);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testResourceAggregateMissingResources() {
- control.replay();
-
- ResourceAggregate aggregate = new ResourceAggregate()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
- ThriftBackfill.backfillResourceAggregate(aggregate);
- }
-
- @Test
- public void testBackfillTierProduction() {
- TaskConfig config = new TaskConfig()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
- .setProduction(true)
- .setTier("tierName");
- TaskConfig expected = config.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
- expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
-
- control.replay();
-
- assertEquals(
- expected,
- thriftBackfill.backfillTask(config));
- }
-
- @Test
- public void testBackfillTierNotProduction() {
- TaskConfig config = new TaskConfig()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
- .setProduction(true)
- .setTier("tierName");
- TaskConfig configWithBackfilledResources = config.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
- expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
- .andReturn(TaskTestUtil.DEV_TIER);
-
- control.replay();
-
- TaskConfig expected = configWithBackfilledResources.deepCopy()
- .setProduction(false);
-
- assertEquals(
- expected,
- thriftBackfill.backfillTask(config));
- }
-
- @Test
- public void testBackfillTierSetsTierToPreemptible() {
- TaskConfig config = new TaskConfig()
- .setResources(ImmutableSet.of(
- numCpus(1.0),
- ramMb(32),
- diskMb(64)));
- TaskConfig configWithBackfilledResources = config.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
- expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
-
- control.replay();
-
- TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible");
-
- assertEquals(
- expected,
- thriftBackfill.backfillTask(config));
- }
-
- @Test
- public void testBackfillTierSetsTierToPreferred() {
- TaskConfig config = new TaskConfig()
- .setResources(ImmutableSet.of(
- numCpus(1.0),
- ramMb(32),
- diskMb(64)))
- .setProduction(true);
- TaskConfig configWithBackfilledResources = config.deepCopy()
- .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
- expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
-
- control.replay();
-
- TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred");
-
- assertEquals(
- expected,
- thriftBackfill.backfillTask(config));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testBackfillTierBadTierConfiguration() {
- TaskConfig config = new TaskConfig()
- .setResources(ImmutableSet.of(
- numCpus(1.0),
- ramMb(32),
- diskMb(64)));
-
- expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
-
- control.replay();
-
- thriftBackfill.backfillTask(config);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
deleted file mode 100644
index 8a99b36..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class WriteAheadStorageTest extends EasyMockTest {
-
- private LogStorage.TransactionManager transactionManager;
- private TaskStore.Mutable taskStore;
- private AttributeStore.Mutable attributeStore;
- private JobUpdateStore.Mutable jobUpdateStore;
- private EventSink eventSink;
- private WriteAheadStorage storage;
-
- @Before
- public void setUp() {
- transactionManager = createMock(LogStorage.TransactionManager.class);
- taskStore = createMock(TaskStore.Mutable.class);
- attributeStore = createMock(AttributeStore.Mutable.class);
- jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
- eventSink = createMock(EventSink.class);
-
- storage = new WriteAheadStorage(
- transactionManager,
- createMock(SchedulerStore.Mutable.class),
- createMock(CronJobStore.Mutable.class),
- taskStore,
- createMock(QuotaStore.Mutable.class),
- attributeStore,
- jobUpdateStore,
- LoggerFactory.getLogger(WriteAheadStorageTest.class),
- eventSink);
- }
-
- private void expectOp(Op op) {
- expect(transactionManager.hasActiveTransaction()).andReturn(true);
- transactionManager.log(op);
- }
-
- @Test
- public void testRemoveUpdates() {
- Set<IJobUpdateKey> removed = ImmutableSet.of(
- IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
- IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
- jobUpdateStore.removeJobUpdates(removed);
- // No operation is written since this Op is in read-only compatibility mode.
-
- control.replay();
-
- storage.removeJobUpdates(removed);
- }
-
- @Test
- public void testMutate() {
- String taskId = "a";
- Function<IScheduledTask, IScheduledTask> mutator =
- createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
- Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
-
- expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
- expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
- control.replay();
-
- assertEquals(mutated, storage.mutateTask(taskId, mutator));
- }
-
- @Test
- public void testSaveHostAttributes() {
- IHostAttributes attributes = IHostAttributes.build(
- new HostAttributes()
- .setHost("a")
- .setMode(MaintenanceMode.DRAINING)
- .setAttributes(ImmutableSet.of(
- new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
-
- expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
- expectOp(Op.saveHostAttributes(
- new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
- eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
-
- expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
-
- control.replay();
-
- assertTrue(storage.saveHostAttributes(attributes));
-
- assertFalse(storage.saveHostAttributes(attributes));
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteAllTasks() {
- control.replay();
- storage.deleteAllTasks();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteHostAttributes() {
- control.replay();
- storage.deleteHostAttributes();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteJobs() {
- control.replay();
- storage.deleteJobs();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteQuotas() {
- control.replay();
- storage.deleteQuotas();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteAllUpdatesAndEvents() {
- control.replay();
- storage.deleteAllUpdates();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 42a79a6..8837384 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -84,6 +84,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.backup.Recovery;
import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -176,6 +177,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"));
private StorageTestUtil storageUtil;
+ private DistributedSnapshotStore snapshotStore;
private StorageBackup backup;
private Recovery recovery;
private MaintenanceController maintenance;
@@ -194,6 +196,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void setUp() throws Exception {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
+ snapshotStore = createMock(DistributedSnapshotStore.class);
backup = createMock(StorageBackup.class);
recovery = createMock(Recovery.class);
maintenance = createMock(MaintenanceController.class);
@@ -212,6 +215,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
TaskTestUtil.CONFIGURATION_MANAGER,
THRESHOLDS,
storageUtil.storage,
+ snapshotStore,
backup,
recovery,
cronJobManager,
@@ -1105,10 +1109,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
@Test
public void testSnapshot() throws Exception {
- storageUtil.storage.snapshot();
+ snapshotStore.snapshot();
expectLastCall();
- storageUtil.storage.snapshot();
+ snapshotStore.snapshot();
expectLastCall().andThrow(new StorageException("mock error!"));
control.replay();
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index b2c371c..bb0fd89 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -62,6 +62,7 @@ import org.apache.aurora.scheduler.quota.QuotaModule;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.stats.StatsModule;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -135,6 +136,7 @@ public class ThriftIT extends EasyMockTest {
bind(FrameworkInfoFactoryImpl.class).in(Singleton.class);
bindMock(Recovery.class);
bindMock(StorageBackup.class);
+ bindMock(DistributedSnapshotStore.class);
bind(IServerInfo.class).toInstance(SERVER_INFO);
}
[3/4] aurora git commit: Extract a storage Persistence layer
Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
deleted file mode 100644
index 07b4bdb..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A storage implementation that ensures committed transactions are written to a log.
- *
- * <p>In the classic write-ahead log usage we'd perform mutations as follows:
- * <ol>
- * <li>write op to log</li>
- * <li>perform op locally</li>
- * <li>*checkpoint</li>
- * </ol>
- *
- * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we
- * have a record of our mutation in case we should need to recover state later after a crash or on
- * a new host (assuming the log is distributed). We then apply the mutation to a local (in-memory)
- * data structure for serving fast read requests and then optionally write down the position of the
- * log entry we wrote in the first step to stable storage to allow for quicker recovery after a
- * crash. Instead of reading the whole log, we can read all entries past the checkpoint. This
- * design implies that all mutations must be idempotent and free from constraint and thus
- * replayable over newer operations when recovering from an old checkpoint.
- *
- * <p>The important detail in our case is the possibility of writing an op to the log, and then
- * failing to commit locally since we use a local database instead of an in-memory data structure.
- * If we die after such a failure, then another instance can read and apply the logged op
- * erroneously.
- *
- * <p>This implementation leverages a local transaction to handle this:
- * <ol>
- * <li>start local transaction</li>
- * <li>perform op locally (uncommitted!)</li>
- * <li>write op to log</li>
- * <li>commit local transaction</li>
- * <li>*checkpoint</li>
- * </ol>
- *
- * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
- * fails to apply to the log, it'll throw and abort the local storage transaction as well.
- */
-public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore {
-
- /**
- * A service that can schedule an action to be executed periodically.
- */
- @VisibleForTesting
- interface SchedulingService {
-
- /**
- * Schedules an action to execute periodically.
- *
- * @param interval The time period to wait until running the {@code action} again.
- * @param action The action to execute periodically.
- */
- void doEvery(Amount<Long, Time> interval, Runnable action);
- }
-
- /**
- * A maintainer for context about open transactions. Assumes that an external entity is
- * responsible for opening and closing transactions.
- */
- interface TransactionManager {
-
- /**
- * Checks whether there is an open transaction.
- *
- * @return {@code true} if there is an open transaction, {@code false} otherwise.
- */
- boolean hasActiveTransaction();
-
- /**
- * Adds an operation to the existing transaction.
- *
- * @param op Operation to include in the existing transaction.
- */
- void log(Op op);
- }
-
- private static class ScheduledExecutorSchedulingService implements SchedulingService {
- private final ScheduledExecutorService scheduledExecutor;
-
- ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
- Amount<Long, Time> shutdownGracePeriod) {
- scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
- shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
- scheduledExecutor,
- shutdownGracePeriod.getValue(),
- shutdownGracePeriod.getUnit().getTimeUnit()));
- }
-
- @Override
- public void doEvery(Amount<Long, Time> interval, Runnable action) {
- requireNonNull(interval);
- requireNonNull(action);
-
- long delay = interval.getValue();
- TimeUnit timeUnit = interval.getUnit().getTimeUnit();
- scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(LogStorage.class);
-
- private final LogManager logManager;
- private final SchedulingService schedulingService;
- private final SnapshotStore<Snapshot> snapshotStore;
- private final Amount<Long, Time> snapshotInterval;
- private final Storage writeBehindStorage;
- private final SchedulerStore.Mutable writeBehindSchedulerStore;
- private final CronJobStore.Mutable writeBehindJobStore;
- private final TaskStore.Mutable writeBehindTaskStore;
- private final QuotaStore.Mutable writeBehindQuotaStore;
- private final AttributeStore.Mutable writeBehindAttributeStore;
- private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
- private final ReentrantLock writeLock;
- private final ThriftBackfill thriftBackfill;
-
- private StreamManager streamManager;
- private final WriteAheadStorage writeAheadStorage;
-
- // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
- // recovering are controlled at this layer (they're all calls to Mutable store implementations).
- // The more involved change is changing SnapshotStore to accept a Mutable store provider to
- // avoid a call to Storage.write() when we replay a Snapshot.
- private boolean recovered = false;
- private StreamTransaction transaction = null;
-
- private final SlidingStats writerWaitStats =
- new SlidingStats("log_storage_write_lock_wait", "ns");
-
- private final Map<LogEntry._Fields, Consumer<LogEntry>> logEntryReplayActions;
- private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
-
- @Inject
- LogStorage(
- LogManager logManager,
- ShutdownRegistry shutdownRegistry,
- Settings settings,
- SnapshotStore<Snapshot> snapshotStore,
- @Volatile Storage storage,
- @Volatile SchedulerStore.Mutable schedulerStore,
- @Volatile CronJobStore.Mutable jobStore,
- @Volatile TaskStore.Mutable taskStore,
- @Volatile QuotaStore.Mutable quotaStore,
- @Volatile AttributeStore.Mutable attributeStore,
- @Volatile JobUpdateStore.Mutable jobUpdateStore,
- EventSink eventSink,
- ReentrantLock writeLock,
- ThriftBackfill thriftBackfill) {
-
- this(logManager,
- new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()),
- snapshotStore,
- settings.getSnapshotInterval(),
- storage,
- schedulerStore,
- jobStore,
- taskStore,
- quotaStore,
- attributeStore,
- jobUpdateStore,
- eventSink,
- writeLock,
- thriftBackfill);
- }
-
- @VisibleForTesting
- LogStorage(
- LogManager logManager,
- SchedulingService schedulingService,
- SnapshotStore<Snapshot> snapshotStore,
- Amount<Long, Time> snapshotInterval,
- Storage delegateStorage,
- SchedulerStore.Mutable schedulerStore,
- CronJobStore.Mutable jobStore,
- TaskStore.Mutable taskStore,
- QuotaStore.Mutable quotaStore,
- AttributeStore.Mutable attributeStore,
- JobUpdateStore.Mutable jobUpdateStore,
- EventSink eventSink,
- ReentrantLock writeLock,
- ThriftBackfill thriftBackfill) {
-
- this.logManager = requireNonNull(logManager);
- this.schedulingService = requireNonNull(schedulingService);
- this.snapshotStore = requireNonNull(snapshotStore);
- this.snapshotInterval = requireNonNull(snapshotInterval);
-
- // Log storage has two distinct operating modes: pre- and post-recovery. When recovering,
- // we write directly to the writeBehind stores since we are replaying what's already persisted.
- // After that, all writes must succeed in the distributed log before they may be considered
- // successful.
- this.writeBehindStorage = requireNonNull(delegateStorage);
- this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
- this.writeBehindJobStore = requireNonNull(jobStore);
- this.writeBehindTaskStore = requireNonNull(taskStore);
- this.writeBehindQuotaStore = requireNonNull(quotaStore);
- this.writeBehindAttributeStore = requireNonNull(attributeStore);
- this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
- this.writeLock = requireNonNull(writeLock);
- this.thriftBackfill = requireNonNull(thriftBackfill);
- TransactionManager transactionManager = new TransactionManager() {
- @Override
- public boolean hasActiveTransaction() {
- return transaction != null;
- }
-
- @Override
- public void log(Op op) {
- transaction.add(op);
- }
- };
- this.writeAheadStorage = new WriteAheadStorage(
- transactionManager,
- schedulerStore,
- jobStore,
- taskStore,
- quotaStore,
- attributeStore,
- jobUpdateStore,
- LoggerFactory.getLogger(WriteAheadStorage.class),
- eventSink);
-
- this.logEntryReplayActions = buildLogEntryReplayActions();
- this.transactionReplayActions = buildTransactionReplayActions();
- }
-
- @VisibleForTesting
- final Map<LogEntry._Fields, Consumer<LogEntry>> buildLogEntryReplayActions() {
- return ImmutableMap.<LogEntry._Fields, Consumer<LogEntry>>builder()
- .put(LogEntry._Fields.SNAPSHOT, logEntry -> {
- Snapshot snapshot = logEntry.getSnapshot();
- LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
- snapshotStore.applySnapshot(snapshot);
- })
- .put(LogEntry._Fields.TRANSACTION, logEntry -> write((NoResult.Quiet) unused -> {
- for (Op op : logEntry.getTransaction().getOps()) {
- replayOp(op);
- }
- }))
- .put(LogEntry._Fields.NOOP, item -> {
- // Nothing to do here
- })
- .build();
- }
-
- @VisibleForTesting
- final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
- return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
- .put(
- Op._Fields.SAVE_FRAMEWORK_ID,
- op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
- .put(Op._Fields.SAVE_CRON_JOB, op -> {
- SaveCronJob cronJob = op.getSaveCronJob();
- writeBehindJobStore.saveAcceptedJob(
- thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
- })
- .put(
- Op._Fields.REMOVE_JOB,
- op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
- .put(
- Op._Fields.SAVE_TASKS,
- op -> writeBehindTaskStore.saveTasks(
- thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
- .put(
- Op._Fields.REMOVE_TASKS,
- op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
- .put(Op._Fields.SAVE_QUOTA, op -> {
- SaveQuota saveQuota = op.getSaveQuota();
- writeBehindQuotaStore.saveQuota(
- saveQuota.getRole(),
- ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
- })
- .put(
- Op._Fields.REMOVE_QUOTA,
- op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
- .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
- HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
- // Prior to commit 5cf760b, the store would persist maintenance mode changes for
- // unknown hosts. 5cf760b began rejecting these, but the replicated log may still
- // contain entries with a null slave ID.
- if (attributes.isSetSlaveId()) {
- writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
- } else {
- LOG.info("Dropping host attributes with no agent ID: " + attributes);
- }
- })
- .put(Op._Fields.SAVE_JOB_UPDATE, op ->
- writeBehindJobUpdateStore.saveJobUpdate(
- thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
- .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
- SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
- writeBehindJobUpdateStore.saveJobUpdateEvent(
- IJobUpdateKey.build(event.getKey()),
- IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
- })
- .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
- SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
- writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
- IJobUpdateKey.build(event.getKey()),
- IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
- })
- .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
- LOG.info("Dropping prune operation. Updates will be pruned later.");
- })
- .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
- writeBehindJobUpdateStore.removeJobUpdates(
- IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
- .build();
- }
-
- @Override
- @Timed("scheduler_storage_prepare")
- public synchronized void prepare() {
- writeBehindStorage.prepare();
- // Open the log to make a log replica available to the scheduler group.
- try {
- streamManager = logManager.open();
- } catch (IOException e) {
- throw new IllegalStateException("Failed to open the log, cannot continue", e);
- }
- }
-
- @Override
- @Timed("scheduler_storage_start")
- public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
- write((NoResult.Quiet) unused -> {
- // Must have the underlying storage started so we can query it for the last checkpoint.
- // We replay these entries in the forwarded storage system's transactions but not ours - we
- // do not want to re-record these ops to the log.
- recover();
- recovered = true;
-
- // Now that we're recovered we should let any mutations done in initializationLogic append
- // to the log, so run it in one of our transactions.
- write(initializationLogic);
- });
-
- scheduleSnapshots();
- }
-
- @Override
- public void stop() {
- // No-op.
- }
-
- @Timed("scheduler_log_recover")
- void recover() throws RecoveryFailedException {
- try {
- streamManager.readFromBeginning(LogStorage.this::replay);
- } catch (CodingException | InvalidPositionException | StreamAccessException e) {
- throw new RecoveryFailedException(e);
- }
- }
-
- private static final class RecoveryFailedException extends SchedulerException {
- RecoveryFailedException(Throwable cause) {
- super(cause);
- }
- }
-
- private void replay(final LogEntry logEntry) {
- LogEntry._Fields entryField = logEntry.getSetField();
- if (!logEntryReplayActions.containsKey(entryField)) {
- throw new IllegalStateException("Unknown log entry type: " + entryField);
- }
-
- logEntryReplayActions.get(entryField).accept(logEntry);
- }
-
- private void replayOp(Op op) {
- Op._Fields opField = op.getSetField();
- if (!transactionReplayActions.containsKey(opField)) {
- throw new IllegalStateException("Unknown transaction op: " + opField);
- }
-
- transactionReplayActions.get(opField).accept(op);
- }
-
- private void scheduleSnapshots() {
- if (snapshotInterval.getValue() > 0) {
- schedulingService.doEvery(snapshotInterval, () -> {
- try {
- snapshot();
- } catch (StorageException e) {
- if (e.getCause() == null) {
- LOG.warn("StorageException when attempting to snapshot.", e);
- } else {
- LOG.warn(e.getMessage(), e.getCause());
- }
- }
- });
- }
- }
-
- /**
- * Forces a snapshot of the storage state.
- *
- * @throws CodingException If there is a problem encoding the snapshot.
- * @throws InvalidPositionException If the log stream cursor is invalid.
- * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
- */
- @Timed("scheduler_log_snapshot")
- void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
- write((NoResult<CodingException>) (MutableStoreProvider unused) -> {
- LOG.info("Creating snapshot.");
- Snapshot snapshot = snapshotStore.createSnapshot();
- persist(snapshot);
- LOG.info("Snapshot complete."
- + " host attrs: " + snapshot.getHostAttributesSize()
- + ", cron jobs: " + snapshot.getCronJobsSize()
- + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
- + ", tasks: " + snapshot.getTasksSize()
- + ", updates: " + snapshot.getJobUpdateDetailsSize());
- });
- }
-
- @Timed("scheduler_log_snapshot_persist")
- @Override
- public void persist(Snapshot snapshot)
- throws CodingException, InvalidPositionException, StreamAccessException {
-
- streamManager.snapshot(snapshot);
- }
-
- private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
- throws StorageException, E {
-
- // The log stream transaction has already been set up so we just need to delegate with our
- // store provider so any mutations performed by work get logged.
- if (transaction != null) {
- return work.apply(writeAheadStorage);
- }
-
- transaction = streamManager.startTransaction();
- try {
- return writeBehindStorage.write(unused -> {
- T result = work.apply(writeAheadStorage);
- try {
- transaction.commit();
- } catch (CodingException e) {
- throw new IllegalStateException(
- "Problem encoding transaction operations to the log stream", e);
- } catch (StreamAccessException e) {
- throw new StorageException(
- "There was a problem committing the transaction to the log.", e);
- }
- return result;
- });
- } finally {
- transaction = null;
- }
- }
-
- @Override
- public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
- long waitStart = System.nanoTime();
- writeLock.lock();
- try {
- writerWaitStats.accumulate(System.nanoTime() - waitStart);
- // We don't want to use the log when recovering from it, we just want to update the underlying
- // store - so pass mutations straight through to the underlying storage.
- if (!recovered) {
- return writeBehindStorage.write(work);
- }
-
- return doInTransaction(work);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
- return writeBehindStorage.read(work);
- }
-
- @Override
- public void snapshot() throws StorageException {
- try {
- doSnapshot();
- } catch (CodingException e) {
- throw new StorageException("Failed to encode a snapshot", e);
- } catch (InvalidPositionException e) {
- throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
- } catch (StreamAccessException e) {
- throw new StorageException("Failed to create a snapshot", e);
- }
- }
-
- /**
- * Configuration settings for log storage.
- */
- public static class Settings {
- private final Amount<Long, Time> shutdownGracePeriod;
- private final Amount<Long, Time> snapshotInterval;
-
- public Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
- this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
- this.snapshotInterval = requireNonNull(snapshotInterval);
- }
-
- public Amount<Long, Time> getShutdownGracePeriod() {
- return shutdownGracePeriod;
- }
-
- public Amount<Long, Time> getSnapshotInterval() {
- return snapshotInterval;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index c8dc7ad..75ec42a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -32,8 +32,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogStorage.Settings;
+import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings;
import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
@@ -77,10 +79,13 @@ public class LogStorageModule extends PrivateModule {
bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
.toInstance(options.maxLogEntrySize);
bind(LogManager.class).in(Singleton.class);
- bind(LogStorage.class).in(Singleton.class);
+ bind(DurableStorage.class).in(Singleton.class);
- install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
- bind(DistributedSnapshotStore.class).to(LogStorage.class);
+ install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+ bind(LogPersistence.class).in(Singleton.class);
+ bind(Persistence.class).to(LogPersistence.class);
+ bind(DistributedSnapshotStore.class).to(LogPersistence.class);
+ expose(Persistence.class);
expose(Storage.class);
expose(NonVolatileStorage.class);
expose(DistributedSnapshotStore.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 5859f80..739fad7 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -48,6 +48,7 @@ import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index ea147c0..18da32d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -13,7 +13,7 @@
*/
package org.apache.aurora.scheduler.storage.log;
-import java.util.function.Consumer;
+import java.util.Iterator;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Snapshot;
@@ -25,23 +25,21 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
/**
* Manages interaction with the log stream. Log entries can be
- * {@link #readFromBeginning(Consumer) read from} the beginning,
+ * {@link #readFromBeginning() read from} the beginning,
* a {@link #startTransaction() transaction} consisting of one or more local storage
* operations can be committed atomically, or the log can be compacted by
* {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
*/
public interface StreamManager {
/**
- * Reads all entries in the log stream after the given position. If the position
- * supplied is {@code null} then all log entries in the stream will be read.
+ * Reads all entries in the log stream.
*
- * @param reader A reader that will be handed log entries decoded from the stream.
+ * @return All stored log entries.
* @throws CodingException if there was a problem decoding a log entry from the stream.
* @throws InvalidPositionException if the given position is not found in the log.
* @throws StreamAccessException if there is a problem reading from the log.
*/
- void readFromBeginning(Consumer<LogEntry> reader)
- throws CodingException, InvalidPositionException, StreamAccessException;
+ Iterator<LogEntry> readFromBeginning() throws CodingException, StreamAccessException;
/**
* Truncates all entries in the log stream occuring before the given position. The entry at the
@@ -54,8 +52,7 @@ public interface StreamManager {
void truncateBefore(Log.Position position);
/**
- * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
- * atomically.
+ * Starts a transaction that can be used to commit a series of ops to the log stream atomically.
*
* @return StreamTransaction A transaction manager to handle batching up commits to the
* underlying stream.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index baf2647..c5b107f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -19,12 +19,12 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.inject.Inject;
import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -95,31 +95,37 @@ class StreamManagerImpl implements StreamManager {
}
@Override
- public void readFromBeginning(Consumer<LogEntry> reader)
+ public Iterator<LogEntry> readFromBeginning()
throws CodingException, InvalidPositionException, StreamAccessException {
Iterator<Log.Entry> entries = stream.readAll();
- while (entries.hasNext()) {
- LogEntry logEntry = decodeLogEntry(entries.next());
- while (logEntry != null && isFrame(logEntry)) {
- logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
- }
- if (logEntry != null) {
- if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
- logEntry = Entries.inflate(logEntry);
- vars.deflatedEntriesRead.incrementAndGet();
- }
-
- if (logEntry.isSetDeduplicatedSnapshot()) {
- logEntry = LogEntry.snapshot(
- snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+ return new AbstractIterator<LogEntry>() {
+ @Override
+ protected LogEntry computeNext() {
+ while (entries.hasNext()) {
+ LogEntry logEntry = decodeLogEntry(entries.next());
+ while (logEntry != null && isFrame(logEntry)) {
+ logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
+ }
+ if (logEntry != null) {
+ if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
+ logEntry = Entries.inflate(logEntry);
+ vars.deflatedEntriesRead.incrementAndGet();
+ }
+
+ if (logEntry.isSetDeduplicatedSnapshot()) {
+ logEntry = LogEntry.snapshot(
+ snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+ }
+
+ vars.entriesRead.incrementAndGet();
+ return logEntry;
+ }
}
-
- reader.accept(logEntry);
- vars.entriesRead.incrementAndGet();
+ return endOfData();
}
- }
+ };
}
@Nullable
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
deleted file mode 100644
index 92b64bb..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import com.google.inject.Inject;
-
-import org.apache.aurora.GuavaUtils;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.Resource;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IResource;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.String.format;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
-import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
-import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-
-/**
- * Helps migrating thrift schema by populating deprecated and/or replacement fields.
- */
-public final class ThriftBackfill {
-
- private final TierManager tierManager;
-
- @Inject
- public ThriftBackfill(TierManager tierManager) {
- this.tierManager = requireNonNull(tierManager);
- }
-
- private static Resource getResource(Set<Resource> resources, ResourceType type) {
- return resources.stream()
- .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
- .findFirst()
- .orElseThrow(() ->
- new IllegalArgumentException("Missing resource definition for " + type));
- }
-
- /**
- * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
- *
- * @param config TaskConfig to backfill.
- * @return Backfilled TaskConfig.
- */
- public TaskConfig backfillTask(TaskConfig config) {
- backfillTier(config);
- return config;
- }
-
- private void backfillTier(TaskConfig config) {
- ITaskConfig taskConfig = ITaskConfig.build(config);
- if (config.isSetTier()) {
- TierInfo tier = tierManager.getTier(taskConfig);
- config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
- } else {
- config.setTier(tierManager.getTiers()
- .entrySet()
- .stream()
- .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
- && !e.getValue().isRevocable())
- .findFirst()
- .orElseThrow(() -> new IllegalStateException(
- format("No matching implicit tier for task of job %s", taskConfig.getJob())))
- .getKey());
- }
- }
-
- /**
- * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
- *
- * @param jobConfig JobConfiguration to backfill.
- * @return Backfilled JobConfiguration.
- */
- public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
- backfillTask(jobConfig.getTaskConfig());
- return IJobConfiguration.build(jobConfig);
- }
-
- /**
- * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
- *
- * @param tasks Set of tasks to backfill.
- * @return Backfilled set of tasks.
- */
- public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
- return tasks.stream()
- .map(t -> backfillScheduledTask(t))
- .map(IScheduledTask::build)
- .collect(GuavaUtils.toImmutableSet());
- }
-
- /**
- * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
- *
- * @param aggregate ResourceAggregate to backfill.
- * @return Backfilled IResourceAggregate.
- */
- public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
- if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
- aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
- aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
- aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
- } else {
- EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
- if (aggregate.getResources().size() > quotaResources.size()) {
- throw new IllegalArgumentException("Too many resource values in quota.");
- }
-
- if (!quotaResources.equals(aggregate.getResources().stream()
- .map(e -> ResourceType.fromResource(IResource.build(e)))
- .collect(Collectors.toSet()))) {
-
- throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
- }
- aggregate.setNumCpus(
- getResource(aggregate.getResources(), CPUS).getNumCpus());
- aggregate.setRamMb(
- getResource(aggregate.getResources(), RAM_MB).getRamMb());
- aggregate.setDiskMb(
- getResource(aggregate.getResources(), DISK_MB).getDiskMb());
- }
- return IResourceAggregate.build(aggregate);
- }
-
- private ScheduledTask backfillScheduledTask(ScheduledTask task) {
- backfillTask(task.getAssignedTask().getTask());
- return task;
- }
-
- /**
- * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
- *
- * @param update JobUpdate to backfill.
- * @return Backfilled job update.
- */
- IJobUpdate backFillJobUpdate(JobUpdate update) {
- JobUpdateInstructions instructions = update.getInstructions();
- if (instructions.isSetDesiredState()) {
- backfillTask(instructions.getDesiredState().getTask());
- }
-
- instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
-
- return IJobUpdate.build(update);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
deleted file mode 100644
index 41061f8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager;
-
-/**
- * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
- * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
- * stores.
- */
-class WriteAheadStorage implements
- MutableStoreProvider,
- SchedulerStore.Mutable,
- CronJobStore.Mutable,
- TaskStore.Mutable,
- QuotaStore.Mutable,
- AttributeStore.Mutable,
- JobUpdateStore.Mutable {
-
- private final TransactionManager transactionManager;
- private final SchedulerStore.Mutable schedulerStore;
- private final CronJobStore.Mutable jobStore;
- private final TaskStore.Mutable taskStore;
- private final QuotaStore.Mutable quotaStore;
- private final AttributeStore.Mutable attributeStore;
- private final JobUpdateStore.Mutable jobUpdateStore;
- private final Logger log;
- private final EventSink eventSink;
-
- /**
- * Creates a new write-ahead storage that delegates to the providing default stores.
- *
- * @param transactionManager External controller for transaction operations.
- * @param schedulerStore Delegate.
- * @param jobStore Delegate.
- * @param taskStore Delegate.
- * @param quotaStore Delegate.
- * @param attributeStore Delegate.
- * @param jobUpdateStore Delegate.
- */
- WriteAheadStorage(
- TransactionManager transactionManager,
- SchedulerStore.Mutable schedulerStore,
- CronJobStore.Mutable jobStore,
- TaskStore.Mutable taskStore,
- QuotaStore.Mutable quotaStore,
- AttributeStore.Mutable attributeStore,
- JobUpdateStore.Mutable jobUpdateStore,
- Logger log,
- EventSink eventSink) {
-
- this.transactionManager = requireNonNull(transactionManager);
- this.schedulerStore = requireNonNull(schedulerStore);
- this.jobStore = requireNonNull(jobStore);
- this.taskStore = requireNonNull(taskStore);
- this.quotaStore = requireNonNull(quotaStore);
- this.attributeStore = requireNonNull(attributeStore);
- this.jobUpdateStore = requireNonNull(jobUpdateStore);
- this.log = requireNonNull(log);
- this.eventSink = requireNonNull(eventSink);
- }
-
- private void write(Op op) {
- Preconditions.checkState(
- transactionManager.hasActiveTransaction(),
- "Mutating operations must be within a transaction.");
- transactionManager.log(op);
- }
-
- @Override
- public void saveFrameworkId(final String frameworkId) {
- requireNonNull(frameworkId);
-
- write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
- schedulerStore.saveFrameworkId(frameworkId);
- }
-
- @Override
- public void deleteTasks(final Set<String> taskIds) {
- requireNonNull(taskIds);
-
- write(Op.removeTasks(new RemoveTasks(taskIds)));
- taskStore.deleteTasks(taskIds);
- }
-
- @Override
- public void saveTasks(final Set<IScheduledTask> newTasks) {
- requireNonNull(newTasks);
-
- write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
- taskStore.saveTasks(newTasks);
- }
-
- @Override
- public Optional<IScheduledTask> mutateTask(
- String taskId,
- Function<IScheduledTask, IScheduledTask> mutator) {
-
- Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
- log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
- write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
- return mutated;
- }
-
- @Override
- public void saveQuota(final String role, final IResourceAggregate quota) {
- requireNonNull(role);
- requireNonNull(quota);
-
- write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
- quotaStore.saveQuota(role, quota);
- }
-
- @Override
- public boolean saveHostAttributes(final IHostAttributes attrs) {
- requireNonNull(attrs);
-
- boolean changed = attributeStore.saveHostAttributes(attrs);
- if (changed) {
- write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
- eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
- }
- return changed;
- }
-
- @Override
- public void removeJob(final IJobKey jobKey) {
- requireNonNull(jobKey);
-
- write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
- jobStore.removeJob(jobKey);
- }
-
- @Override
- public void saveAcceptedJob(final IJobConfiguration jobConfig) {
- requireNonNull(jobConfig);
-
- write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
- jobStore.saveAcceptedJob(jobConfig);
- }
-
- @Override
- public void removeQuota(final String role) {
- requireNonNull(role);
-
- write(Op.removeQuota(new RemoveQuota(role)));
- quotaStore.removeQuota(role);
- }
-
- @Override
- public void saveJobUpdate(IJobUpdate update) {
- requireNonNull(update);
-
- write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
- jobUpdateStore.saveJobUpdate(update);
- }
-
- @Override
- public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
- requireNonNull(key);
- requireNonNull(event);
-
- write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
- jobUpdateStore.saveJobUpdateEvent(key, event);
- }
-
- @Override
- public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
- requireNonNull(key);
- requireNonNull(event);
-
- write(Op.saveJobInstanceUpdateEvent(
- new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
- jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
- }
-
- @Override
- public void removeJobUpdates(Set<IJobUpdateKey> keys) {
- requireNonNull(keys);
-
- // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
- // read it. JobUpdates are only removed implicitly when a snapshot is taken.
- jobUpdateStore.removeJobUpdates(keys);
- }
-
- @Override
- public void deleteAllTasks() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteHostAttributes() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteJobs() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteQuotas() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteAllUpdates() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public SchedulerStore.Mutable getSchedulerStore() {
- return this;
- }
-
- @Override
- public CronJobStore.Mutable getCronJobStore() {
- return this;
- }
-
- @Override
- public TaskStore.Mutable getUnsafeTaskStore() {
- return this;
- }
-
- @Override
- public QuotaStore.Mutable getQuotaStore() {
- return this;
- }
-
- @Override
- public AttributeStore.Mutable getAttributeStore() {
- return this;
- }
-
- @Override
- public TaskStore getTaskStore() {
- return this;
- }
-
- @Override
- public JobUpdateStore.Mutable getJobUpdateStore() {
- return this;
- }
-
- @Override
- public Optional<String> fetchFrameworkId() {
- return this.schedulerStore.fetchFrameworkId();
- }
-
- @Override
- public Iterable<IJobConfiguration> fetchJobs() {
- return this.jobStore.fetchJobs();
- }
-
- @Override
- public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
- return this.jobStore.fetchJob(jobKey);
- }
-
- @Override
- public Optional<IScheduledTask> fetchTask(String taskId) {
- return this.taskStore.fetchTask(taskId);
- }
-
- @Override
- public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
- return this.taskStore.fetchTasks(query);
- }
-
- @Override
- public Set<IJobKey> getJobKeys() {
- return this.taskStore.getJobKeys();
- }
-
- @Override
- public Optional<IResourceAggregate> fetchQuota(String role) {
- return this.quotaStore.fetchQuota(role);
- }
-
- @Override
- public Map<String, IResourceAggregate> fetchQuotas() {
- return this.quotaStore.fetchQuotas();
- }
-
- @Override
- public Optional<IHostAttributes> getHostAttributes(String host) {
- return this.attributeStore.getHostAttributes(host);
- }
-
- @Override
- public Set<IHostAttributes> getHostAttributes() {
- return this.attributeStore.getHostAttributes();
- }
-
- @Override
- public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
- return this.jobUpdateStore.fetchJobUpdates(query);
- }
-
- @Override
- public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
- return this.jobUpdateStore.fetchJobUpdate(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 2cc567d..a519b07 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -83,11 +83,13 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.backup.Recovery;
import org.apache.aurora.scheduler.storage.backup.StorageBackup;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IHostStatus;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -99,7 +101,6 @@ import org.apache.aurora.scheduler.storage.entities.IMetadata;
import org.apache.aurora.scheduler.storage.entities.IRange;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin;
import org.apache.aurora.scheduler.thrift.aop.ThriftWorkload;
import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
@@ -167,6 +168,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
private final ConfigurationManager configurationManager;
private final Thresholds thresholds;
private final NonVolatileStorage storage;
+ private final DistributedSnapshotStore snapshotStore;
private final StorageBackup backup;
private final Recovery recovery;
private final MaintenanceController maintenance;
@@ -195,6 +197,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
ConfigurationManager configurationManager,
Thresholds thresholds,
NonVolatileStorage storage,
+ DistributedSnapshotStore snapshotStore,
StorageBackup backup,
Recovery recovery,
CronJobManager cronJobManager,
@@ -211,6 +214,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
this.configurationManager = requireNonNull(configurationManager);
this.thresholds = requireNonNull(thresholds);
this.storage = requireNonNull(storage);
+ this.snapshotStore = requireNonNull(snapshotStore);
this.backup = requireNonNull(backup);
this.recovery = requireNonNull(recovery);
this.maintenance = requireNonNull(maintenance);
@@ -635,7 +639,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
@Override
public Response snapshot() {
- storage.snapshot();
+ snapshotStore.snapshot();
return ok();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 8cf6871..e82b637 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -36,11 +36,6 @@ public class FakeNonVolatileStorage implements NonVolatileStorage {
}
@Override
- public void snapshot() throws StorageException {
- // No-op.
- }
-
- @Override
public void start(Quiet initializationLogic) throws StorageException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index c639ab6..aeb8685 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -26,6 +26,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.util.Modules;
+import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.app.SchedulerMain;
import org.apache.aurora.scheduler.app.local.simulator.ClusterSimulatorModule;
@@ -82,7 +83,17 @@ public final class LocalSchedulerMain {
protected void configure() {
bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
- bind(DistributedSnapshotStore.class).toInstance(snapshot -> { });
+ bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
+ @Override
+ public void snapshot() throws Storage.StorageException {
+ // no-op
+ }
+
+ @Override
+ public void snapshotWith(Snapshot snapshot) {
+ // no-op
+ }
+ });
}
};
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 7138d6b..09560f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -98,7 +98,7 @@ public class RecoveryTest extends EasyMockTest {
Capture<MutateWork<Object, Exception>> transaction = createCapture();
expect(primaryStorage.write(capture(transaction))).andReturn(null);
Capture<Snapshot> snapshot = createCapture();
- distributedStore.persist(capture(snapshot));
+ distributedStore.snapshotWith(capture(snapshot));
shutDownNow.execute();
control.replay();
@@ -127,7 +127,7 @@ public class RecoveryTest extends EasyMockTest {
Capture<MutateWork<Object, Exception>> transaction = createCapture();
expect(primaryStorage.write(capture(transaction))).andReturn(null);
Capture<Snapshot> snapshot = createCapture();
- distributedStore.persist(capture(snapshot));
+ distributedStore.snapshotWith(capture(snapshot));
shutDownNow.execute();
control.replay();
[4/4] aurora git commit: Extract a storage Persistence layer
Posted by wf...@apache.org.
Extract a storage Persistence layer
This extracts the `Log`- and `Snapshot`-specific details from `LogStorage`,
leaving `DurableStorage`. `DurableStorage` is useful as a general-purpose
`Storage` mutation observer, with `Persistence` being the minimal behavior
needed for an underlying durability layer to provide.
Reviewed at https://reviews.apache.org/r/64234/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/cea43db9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/cea43db9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/cea43db9
Branch: refs/heads/master
Commit: cea43db9ded1201f69a85a43fb67244c69cf5347
Parents: de8b375
Author: Bill Farner <wf...@apache.org>
Authored: Sat Dec 2 19:59:03 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Sat Dec 2 19:59:03 2017 -0800
----------------------------------------------------------------------
.../apache/aurora/codec/ThriftBinaryCodec.java | 2 +-
.../aurora/scheduler/base/TaskTestUtil.java | 2 +-
.../configuration/ConfigurationManager.java | 2 +-
.../scheduler/resources/ResourceManager.java | 2 +-
.../storage/CallOrderEnforcingStorage.java | 6 -
.../storage/DistributedSnapshotStore.java | 15 +-
.../aurora/scheduler/storage/Storage.java | 10 -
.../scheduler/storage/backup/Recovery.java | 2 +-
.../storage/backup/TemporaryStorage.java | 2 +-
.../storage/durability/DurableStorage.java | 350 ++++++++
.../storage/durability/Persistence.java | 64 ++
.../storage/durability/ThriftBackfill.java | 175 ++++
.../storage/durability/TransactionRecorder.java | 122 +++
.../storage/durability/WriteAheadStorage.java | 368 ++++++++
.../scheduler/storage/log/LogPersistence.java | 257 ++++++
.../scheduler/storage/log/LogStorage.java | 576 ------------
.../scheduler/storage/log/LogStorageModule.java | 13 +-
.../storage/log/SnapshotStoreImpl.java | 1 +
.../scheduler/storage/log/StreamManager.java | 15 +-
.../storage/log/StreamManagerImpl.java | 46 +-
.../scheduler/storage/log/ThriftBackfill.java | 175 ----
.../storage/log/WriteAheadStorage.java | 369 --------
.../thrift/SchedulerThriftInterface.java | 8 +-
.../app/local/FakeNonVolatileStorage.java | 5 -
.../scheduler/app/local/LocalSchedulerMain.java | 13 +-
.../scheduler/storage/backup/RecoveryTest.java | 4 +-
.../storage/durability/DurableStorageTest.java | 781 ++++++++++++++++
.../storage/durability/ThriftBackfillTest.java | 222 +++++
.../durability/TransactionRecorderTest.java | 78 ++
.../durability/WriteAheadStorageTest.java | 166 ++++
.../scheduler/storage/log/LogManagerTest.java | 86 +-
.../scheduler/storage/log/LogStorageTest.java | 897 -------------------
.../storage/log/NonVolatileStorageTest.java | 5 +-
.../storage/log/SnapshotStoreImplIT.java | 1 +
.../storage/log/ThriftBackfillTest.java | 222 -----
.../storage/log/WriteAheadStorageTest.java | 165 ----
.../thrift/SchedulerThriftInterfaceTest.java | 8 +-
.../aurora/scheduler/thrift/ThriftIT.java | 2 +
38 files changed, 2687 insertions(+), 2550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
index 3c12532..cdbe359 100644
--- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -217,7 +217,7 @@ public final class ThriftBinaryCodec {
/**
* Thrown when serialization or deserialization failed.
*/
- public static class CodingException extends Exception {
+ public static class CodingException extends RuntimeException {
public CodingException(String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 5fe7b9b..e1f20f4 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.Protos.ExecutorID;
import org.apache.mesos.v1.Protos.ExecutorInfo;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index fa2f39c..f3e98f2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -40,6 +40,7 @@ import org.apache.aurora.scheduler.base.UserProvidedStrings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IContainer;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import static java.util.Objects.requireNonNull;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index f9dee22..d093753 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,12 +26,12 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.mesos.v1.Protos.Resource;
import static org.apache.aurora.scheduler.resources.ResourceType.BY_MESOS_NAME;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 1b10ec5..25fd315 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -132,12 +132,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
return wrapped.write(work);
}
- @Override
- public void snapshot() throws StorageException {
- checkState(State.READY);
- wrapped.snapshot();
- }
-
/**
* Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
* exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
index 4ddee40..0c6a955 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
@@ -15,18 +15,25 @@ package org.apache.aurora.scheduler.storage;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
/**
* A distributed snapshot store that supports persisting globally-visible snapshots.
*/
public interface DistributedSnapshotStore {
+
+ /**
+ * Clean up the underlying storage by optimizing internal data structures. Does not change
+ * externally-visible state but might not run concurrently with write operations.
+ */
+ void snapshot() throws StorageException;
+
/**
- * Writes a snapshot to the distributed storage system.
- * TODO(William Farner): Currently we're hiding some exceptions (which happen to be
- * RuntimeExceptions). Clean these up to be checked, and throw another exception type here.
+ * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
+ * internally-generated one based on the current state.
*
* @param snapshot Snapshot to write.
* @throws CodingException If the snapshot could not be serialized.
*/
- void persist(Snapshot snapshot) throws CodingException;
+ void snapshotWith(Snapshot snapshot) throws CodingException;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 7d325b6..c9ea1de 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -196,10 +196,6 @@ public interface Storage {
* Executes the unit of read-only {@code work}. The consistency model creates the possibility
* for a reader to read uncommitted state from a concurrent writer.
* <p>
- * TODO(wfarner): Update this documentation once all stores are backed by
- * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then
- * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used.
- * <p>
* TODO(wfarner): This method no longer needs to exist now that there is no global locking for
* reads. We could instead directly inject the individual stores where they are used, as long
* as the stores have a layer to replicate what is currently done by
@@ -253,12 +249,6 @@ public interface Storage {
void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
/**
- * Clean up the underlying storage by optimizing internal data structures. Does not change
- * externally-visible state but might not run concurrently with write operations.
- */
- void snapshot() throws StorageException;
-
- /**
* Prepares the underlying storage system for clean shutdown.
*/
void stop();
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 6cd5b2b..3a62f02 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -197,7 +197,7 @@ public interface Recovery {
void commit() {
primaryStorage.write((NoResult.Quiet) storeProvider -> {
try {
- distributedStore.persist(tempStorage.toSnapshot());
+ distributedStore.snapshotWith(tempStorage.toSnapshot());
shutDownNow.execute();
} catch (CodingException e) {
throw new IllegalStateException("Failed to encode snapshot.", e);
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 3000796..18296b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import static java.util.Objects.requireNonNull;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
new file mode 100644
index 0000000..85b2113
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A storage implementation that ensures storage mutations are written to a persistence layer.
+ *
+ * <p>In the classic write-ahead log usage we'd perform mutations as follows:
+ * <ol>
+ * <li>record op</li>
+ * <li>perform op locally</li>
+ * <li>persist ops</li>
+ * </ol>
+ *
+ * <p>Writing the operation to persistences ensures we have a record of our mutation in case we
+ * should need to recover state later after a crash or on a new host (assuming the scheduler is
+ * distributed). We then apply the mutation to a local (in-memory) data structure for serving fast
+ * read requests.
+ *
+ * <p>This implementation leverages a local transaction to handle this:
+ * <ol>
+ * <li>start local transaction</li>
+ * <li>perform op locally (uncommitted!)</li>
+ * <li>write op to persistence</li>
+ * </ol>
+ *
+ * <p>If the op fails to apply to local storage we will never persist the op, and if the op
+ * fails to persist, it'll throw and abort the local storage operation as well.
+ */
+public class DurableStorage implements NonVolatileStorage {
+
+ /**
+ * A maintainer for context about open transactions. Assumes that an external entity is
+ * responsible for opening and closing transactions.
+ */
+ interface TransactionManager {
+
+ /**
+ * Checks whether there is an open transaction.
+ *
+ * @return {@code true} if there is an open transaction, {@code false} otherwise.
+ */
+ boolean hasActiveTransaction();
+
+ /**
+ * Adds an operation to the existing transaction.
+ *
+ * @param op Operation to include in the existing transaction.
+ */
+ void log(Op op);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class);
+
+ private final Persistence persistence;
+ private final Storage writeBehindStorage;
+ private final SchedulerStore.Mutable writeBehindSchedulerStore;
+ private final CronJobStore.Mutable writeBehindJobStore;
+ private final TaskStore.Mutable writeBehindTaskStore;
+ private final QuotaStore.Mutable writeBehindQuotaStore;
+ private final AttributeStore.Mutable writeBehindAttributeStore;
+ private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
+ private final ReentrantLock writeLock;
+ private final ThriftBackfill thriftBackfill;
+
+ private final WriteAheadStorage writeAheadStorage;
+
+ // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
+ // recovering are controlled at this layer (they're all calls to Mutable store implementations).
+ // The more involved change is changing SnapshotStore to accept a Mutable store provider to
+ // avoid a call to Storage.write() when we replay a Snapshot.
+ private boolean recovered = false;
+ private TransactionRecorder transaction = null;
+
+ private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns");
+
+ private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
+
+ @Inject
+ DurableStorage(
+ Persistence persistence,
+ @Volatile Storage delegateStorage,
+ @Volatile SchedulerStore.Mutable schedulerStore,
+ @Volatile CronJobStore.Mutable jobStore,
+ @Volatile TaskStore.Mutable taskStore,
+ @Volatile QuotaStore.Mutable quotaStore,
+ @Volatile AttributeStore.Mutable attributeStore,
+ @Volatile JobUpdateStore.Mutable jobUpdateStore,
+ EventSink eventSink,
+ ReentrantLock writeLock,
+ ThriftBackfill thriftBackfill) {
+
+ this.persistence = requireNonNull(persistence);
+
+ // DurableStorage has two distinct operating modes: pre- and post-recovery. When recovering,
+ // we write directly to the writeBehind stores since we are replaying what's already persisted.
+ // After that, all writes must succeed in Persistence before they may be considered successful.
+ this.writeBehindStorage = requireNonNull(delegateStorage);
+ this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
+ this.writeBehindJobStore = requireNonNull(jobStore);
+ this.writeBehindTaskStore = requireNonNull(taskStore);
+ this.writeBehindQuotaStore = requireNonNull(quotaStore);
+ this.writeBehindAttributeStore = requireNonNull(attributeStore);
+ this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
+ this.writeLock = requireNonNull(writeLock);
+ this.thriftBackfill = requireNonNull(thriftBackfill);
+ TransactionManager transactionManager = new TransactionManager() {
+ @Override
+ public boolean hasActiveTransaction() {
+ return transaction != null;
+ }
+
+ @Override
+ public void log(Op op) {
+ transaction.add(op);
+ }
+ };
+ this.writeAheadStorage = new WriteAheadStorage(
+ transactionManager,
+ schedulerStore,
+ jobStore,
+ taskStore,
+ quotaStore,
+ attributeStore,
+ jobUpdateStore,
+ LoggerFactory.getLogger(WriteAheadStorage.class),
+ eventSink);
+
+ this.transactionReplayActions = buildTransactionReplayActions();
+ }
+
+ @VisibleForTesting
+ final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
+ return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
+ .put(
+ Op._Fields.SAVE_FRAMEWORK_ID,
+ op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
+ .put(Op._Fields.SAVE_CRON_JOB, op -> {
+ SaveCronJob cronJob = op.getSaveCronJob();
+ writeBehindJobStore.saveAcceptedJob(
+ thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
+ })
+ .put(
+ Op._Fields.REMOVE_JOB,
+ op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
+ .put(
+ Op._Fields.SAVE_TASKS,
+ op -> writeBehindTaskStore.saveTasks(
+ thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
+ .put(
+ Op._Fields.REMOVE_TASKS,
+ op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
+ .put(Op._Fields.SAVE_QUOTA, op -> {
+ SaveQuota saveQuota = op.getSaveQuota();
+ writeBehindQuotaStore.saveQuota(
+ saveQuota.getRole(),
+ ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
+ })
+ .put(
+ Op._Fields.REMOVE_QUOTA,
+ op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
+ .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
+ HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+ // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+ // unknown hosts. 5cf760b began rejecting these, but the storage may still
+ // contain entries with a null slave ID.
+ if (attributes.isSetSlaveId()) {
+ writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+ } else {
+ LOG.info("Dropping host attributes with no agent ID: " + attributes);
+ }
+ })
+ .put(Op._Fields.SAVE_JOB_UPDATE, op ->
+ writeBehindJobUpdateStore.saveJobUpdate(
+ thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
+ .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
+ SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
+ writeBehindJobUpdateStore.saveJobUpdateEvent(
+ IJobUpdateKey.build(event.getKey()),
+ IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+ })
+ .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
+ SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
+ writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
+ IJobUpdateKey.build(event.getKey()),
+ IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+ })
+ .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+ LOG.info("Dropping prune operation. Updates will be pruned later.");
+ })
+ .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+ writeBehindJobUpdateStore.removeJobUpdates(
+ IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+ .build();
+ }
+
+ @Override
+ @Timed("scheduler_storage_prepare")
+ public synchronized void prepare() {
+ writeBehindStorage.prepare();
+ persistence.prepare();
+ }
+
+ @Override
+ @Timed("scheduler_storage_start")
+ public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
+ write((NoResult.Quiet) unused -> {
+ // Must have the underlying storage started so we can query it.
+ // We replay these entries in the forwarded storage system's transactions but not ours - we
+ // do not want to re-record these ops.
+ recover();
+ recovered = true;
+
+ // Now that we're recovered we should persist any mutations done in initializationLogic, so
+ // run it in one of our transactions.
+ write(initializationLogic);
+ });
+ }
+
+ @Override
+ public void stop() {
+ // No-op.
+ }
+
+ @Timed("scheduler_storage_recover")
+ void recover() throws RecoveryFailedException {
+ try {
+ persistence.recover().forEach(DurableStorage.this::replayOp);
+ } catch (PersistenceException e) {
+ throw new RecoveryFailedException(e);
+ }
+ }
+
+ private static final class RecoveryFailedException extends SchedulerException {
+ RecoveryFailedException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private void replayOp(Op op) {
+ Op._Fields opField = op.getSetField();
+ if (!transactionReplayActions.containsKey(opField)) {
+ throw new IllegalStateException("Unknown transaction op: " + opField);
+ }
+
+ transactionReplayActions.get(opField).accept(op);
+ }
+
+ private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
+ throws StorageException, E {
+
+ // The transaction has already been set up so we just need to delegate with our store provider
+ // so any mutations may be persisted.
+ if (transaction != null) {
+ return work.apply(writeAheadStorage);
+ }
+
+ transaction = new TransactionRecorder();
+ try {
+ return writeBehindStorage.write(unused -> {
+ T result = work.apply(writeAheadStorage);
+ List<Op> ops = transaction.getOps();
+ if (!ops.isEmpty()) {
+ try {
+ persistence.persist(ops.stream());
+ } catch (PersistenceException e) {
+ throw new StorageException("Failed to persist storage changes", e);
+ }
+ }
+ return result;
+ });
+ } finally {
+ transaction = null;
+ }
+ }
+
+ @Override
+ public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+ long waitStart = System.nanoTime();
+ writeLock.lock();
+ try {
+ writerWaitStats.accumulate(System.nanoTime() - waitStart);
+ // We don't want to persist when recovering, we just want to update the underlying
+ // store - so pass mutations straight through to the underlying storage.
+ if (!recovered) {
+ return writeBehindStorage.write(work);
+ }
+
+ return doInTransaction(work);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+ return writeBehindStorage.read(work);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
new file mode 100644
index 0000000..9eb862c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+
+/**
+ * Persistence layer for storage operations.
+ */
+public interface Persistence {
+
+ /**
+ * Prepares the persistence layer. The implementation may use this, for example, to advertise as
+ * a replica to cohort schedulers, or begin syncing state for warm standby.
+ */
+ void prepare();
+
+ /**
+ * Recovers previously-persisted records.
+ *
+ * @return All persisted records.
+ * @throws PersistenceException If recovery failed.
+ */
+ Stream<Op> recover() throws PersistenceException;
+
+ /**
+ * Saves new records. No records may be considered durably saved until this method returns
+ * successfully.
+ *
+ * @param records Records to save.
+ * @throws PersistenceException If the records could not be saved.
+ */
+ void persist(Stream<Op> records) throws PersistenceException;
+
+ /**
+ * Thrown when a persistence operation fails.
+ */
+ class PersistenceException extends Exception {
+ public PersistenceException(String msg) {
+ super(msg);
+ }
+
+ public PersistenceException(Throwable cause) {
+ super(cause);
+ }
+
+ public PersistenceException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
new file mode 100644
index 0000000..4425d02
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.inject.Inject;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.Resource;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+
+/**
+ * Helps migrating thrift schema by populating deprecated and/or replacement fields.
+ */
+public final class ThriftBackfill {
+
+ private final TierManager tierManager;
+
+ @Inject
+ public ThriftBackfill(TierManager tierManager) {
+ this.tierManager = requireNonNull(tierManager);
+ }
+
+ private static Resource getResource(Set<Resource> resources, ResourceType type) {
+ return resources.stream()
+ .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
+ .findFirst()
+ .orElseThrow(() ->
+ new IllegalArgumentException("Missing resource definition for " + type));
+ }
+
+ /**
+ * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
+ *
+ * @param config TaskConfig to backfill.
+ * @return Backfilled TaskConfig.
+ */
+ public TaskConfig backfillTask(TaskConfig config) {
+ backfillTier(config);
+ return config;
+ }
+
+ private void backfillTier(TaskConfig config) {
+ ITaskConfig taskConfig = ITaskConfig.build(config);
+ if (config.isSetTier()) {
+ TierInfo tier = tierManager.getTier(taskConfig);
+ config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
+ } else {
+ config.setTier(tierManager.getTiers()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
+ && !e.getValue().isRevocable())
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException(
+ format("No matching implicit tier for task of job %s", taskConfig.getJob())))
+ .getKey());
+ }
+ }
+
+ /**
+ * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
+ *
+ * @param jobConfig JobConfiguration to backfill.
+ * @return Backfilled JobConfiguration.
+ */
+ public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
+ backfillTask(jobConfig.getTaskConfig());
+ return IJobConfiguration.build(jobConfig);
+ }
+
+ /**
+ * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
+ *
+ * @param tasks Set of tasks to backfill.
+ * @return Backfilled set of tasks.
+ */
+ public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
+ return tasks.stream()
+ .map(t -> backfillScheduledTask(t))
+ .map(IScheduledTask::build)
+ .collect(GuavaUtils.toImmutableSet());
+ }
+
+ /**
+ * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
+ *
+ * @param aggregate ResourceAggregate to backfill.
+ * @return Backfilled IResourceAggregate.
+ */
+ public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
+ if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
+ aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
+ aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
+ aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
+ } else {
+ EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
+ if (aggregate.getResources().size() > quotaResources.size()) {
+ throw new IllegalArgumentException("Too many resource values in quota.");
+ }
+
+ if (!quotaResources.equals(aggregate.getResources().stream()
+ .map(e -> ResourceType.fromResource(IResource.build(e)))
+ .collect(Collectors.toSet()))) {
+
+ throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
+ }
+ aggregate.setNumCpus(
+ getResource(aggregate.getResources(), CPUS).getNumCpus());
+ aggregate.setRamMb(
+ getResource(aggregate.getResources(), RAM_MB).getRamMb());
+ aggregate.setDiskMb(
+ getResource(aggregate.getResources(), DISK_MB).getDiskMb());
+ }
+ return IResourceAggregate.build(aggregate);
+ }
+
+ private ScheduledTask backfillScheduledTask(ScheduledTask task) {
+ backfillTask(task.getAssignedTask().getTask());
+ return task;
+ }
+
+ /**
+ * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
+ *
+ * @param update JobUpdate to backfill.
+ * @return Backfilled job update.
+ */
+ public IJobUpdate backFillJobUpdate(JobUpdate update) {
+ JobUpdateInstructions instructions = update.getInstructions();
+ if (instructions.isSetDesiredState()) {
+ backfillTask(instructions.getDesiredState().getTask());
+ }
+
+ instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
+
+ return IJobUpdate.build(update);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
new file mode 100644
index 0000000..1c811e3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+
+/**
+ * Records a sequence of mutations to the storage.
+ */
+class TransactionRecorder {
+ private final List<Op> ops = Lists.newArrayList();
+
+ void add(Op op) {
+ Op prior = Iterables.getLast(ops, null);
+ if (prior == null || !coalesce(prior, op)) {
+ ops.add(op);
+ }
+ }
+
+ List<Op> getOps() {
+ return ops;
+ }
+
+ /**
+ * Tries to coalesce a new op into the prior to compact the binary representation and increase
+ * batching.
+ *
+ * @param prior The previous op.
+ * @param next The next op to be added.
+ * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+ */
+ private boolean coalesce(Op prior, Op next) {
+ if (!prior.isSet() && !next.isSet()) {
+ return false;
+ }
+
+ Op._Fields priorType = prior.getSetField();
+ if (!priorType.equals(next.getSetField())) {
+ return false;
+ }
+
+ switch (priorType) {
+ case SAVE_FRAMEWORK_ID:
+ prior.setSaveFrameworkId(next.getSaveFrameworkId());
+ return true;
+ case SAVE_TASKS:
+ coalesce(prior.getSaveTasks(), next.getSaveTasks());
+ return true;
+ case REMOVE_TASKS:
+ coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+ return true;
+ case SAVE_HOST_ATTRIBUTES:
+ return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+ default:
+ return false;
+ }
+ }
+
+ private void coalesce(SaveTasks prior, SaveTasks next) {
+ if (next.isSetTasks()) {
+ if (prior.isSetTasks()) {
+ // It is an expected invariant that an operation may reference a task (identified by
+ // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations,
+ // the most recent task definition overrides the prior operation.
+ Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+ for (ScheduledTask task : prior.getTasks()) {
+ coalesced.put(task.getAssignedTask().getTaskId(), task);
+ }
+ for (ScheduledTask task : next.getTasks()) {
+ coalesced.put(task.getAssignedTask().getTaskId(), task);
+ }
+ prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+ } else {
+ prior.setTasks(next.getTasks());
+ }
+ }
+ }
+
+ private void coalesce(RemoveTasks prior, RemoveTasks next) {
+ if (next.isSetTaskIds()) {
+ if (prior.isSetTaskIds()) {
+ prior.setTaskIds(ImmutableSet.<String>builder()
+ .addAll(prior.getTaskIds())
+ .addAll(next.getTaskIds())
+ .build());
+ } else {
+ prior.setTaskIds(next.getTaskIds());
+ }
+ }
+ }
+
+ private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+ if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+ prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
new file mode 100644
index 0000000..667db06
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+public class WriteAheadStorage implements
+ MutableStoreProvider,
+ SchedulerStore.Mutable,
+ CronJobStore.Mutable,
+ TaskStore.Mutable,
+ QuotaStore.Mutable,
+ AttributeStore.Mutable,
+ JobUpdateStore.Mutable {
+
+ private final TransactionManager transactionManager;
+ private final SchedulerStore.Mutable schedulerStore;
+ private final CronJobStore.Mutable jobStore;
+ private final TaskStore.Mutable taskStore;
+ private final QuotaStore.Mutable quotaStore;
+ private final AttributeStore.Mutable attributeStore;
+ private final JobUpdateStore.Mutable jobUpdateStore;
+ private final Logger log;
+ private final EventSink eventSink;
+
+ /**
+ * Creates a new write-ahead storage that delegates to the providing default stores.
+ *
+ * @param transactionManager External controller for transaction operations.
+ * @param schedulerStore Delegate.
+ * @param jobStore Delegate.
+ * @param taskStore Delegate.
+ * @param quotaStore Delegate.
+ * @param attributeStore Delegate.
+ * @param jobUpdateStore Delegate.
+ */
+ public WriteAheadStorage(
+ TransactionManager transactionManager,
+ SchedulerStore.Mutable schedulerStore,
+ CronJobStore.Mutable jobStore,
+ TaskStore.Mutable taskStore,
+ QuotaStore.Mutable quotaStore,
+ AttributeStore.Mutable attributeStore,
+ JobUpdateStore.Mutable jobUpdateStore,
+ Logger log,
+ EventSink eventSink) {
+
+ this.transactionManager = requireNonNull(transactionManager);
+ this.schedulerStore = requireNonNull(schedulerStore);
+ this.jobStore = requireNonNull(jobStore);
+ this.taskStore = requireNonNull(taskStore);
+ this.quotaStore = requireNonNull(quotaStore);
+ this.attributeStore = requireNonNull(attributeStore);
+ this.jobUpdateStore = requireNonNull(jobUpdateStore);
+ this.log = requireNonNull(log);
+ this.eventSink = requireNonNull(eventSink);
+ }
+
+ private void write(Op op) {
+ Preconditions.checkState(
+ transactionManager.hasActiveTransaction(),
+ "Mutating operations must be within a transaction.");
+ transactionManager.log(op);
+ }
+
+ @Override
+ public void saveFrameworkId(final String frameworkId) {
+ requireNonNull(frameworkId);
+
+ write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+ schedulerStore.saveFrameworkId(frameworkId);
+ }
+
+ @Override
+ public void deleteTasks(final Set<String> taskIds) {
+ requireNonNull(taskIds);
+
+ write(Op.removeTasks(new RemoveTasks(taskIds)));
+ taskStore.deleteTasks(taskIds);
+ }
+
+ @Override
+ public void saveTasks(final Set<IScheduledTask> newTasks) {
+ requireNonNull(newTasks);
+
+ write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+ taskStore.saveTasks(newTasks);
+ }
+
+ @Override
+ public Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+ log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+ write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+ return mutated;
+ }
+
+ @Override
+ public void saveQuota(final String role, final IResourceAggregate quota) {
+ requireNonNull(role);
+ requireNonNull(quota);
+
+ write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+ quotaStore.saveQuota(role, quota);
+ }
+
+ @Override
+ public boolean saveHostAttributes(final IHostAttributes attrs) {
+ requireNonNull(attrs);
+
+ boolean changed = attributeStore.saveHostAttributes(attrs);
+ if (changed) {
+ write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+ eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
+ }
+ return changed;
+ }
+
+ @Override
+ public void removeJob(final IJobKey jobKey) {
+ requireNonNull(jobKey);
+
+ write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+ jobStore.removeJob(jobKey);
+ }
+
+ @Override
+ public void saveAcceptedJob(final IJobConfiguration jobConfig) {
+ requireNonNull(jobConfig);
+
+ write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+ jobStore.saveAcceptedJob(jobConfig);
+ }
+
+ @Override
+ public void removeQuota(final String role) {
+ requireNonNull(role);
+
+ write(Op.removeQuota(new RemoveQuota(role)));
+ quotaStore.removeQuota(role);
+ }
+
+ @Override
+ public void saveJobUpdate(IJobUpdate update) {
+ requireNonNull(update);
+
+ write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+ jobUpdateStore.saveJobUpdate(update);
+ }
+
+ @Override
+ public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+ requireNonNull(key);
+ requireNonNull(event);
+
+ write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
+ jobUpdateStore.saveJobUpdateEvent(key, event);
+ }
+
+ @Override
+ public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+ requireNonNull(key);
+ requireNonNull(event);
+
+ write(Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
+ jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
+ }
+
+ @Override
+ public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+ requireNonNull(keys);
+
+ // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+ // read it. JobUpdates are only removed implicitly when a snapshot is taken.
+ jobUpdateStore.removeJobUpdates(keys);
+ }
+
+ @Override
+ public void deleteAllTasks() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteHostAttributes() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteJobs() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteQuotas() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteAllUpdates() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public SchedulerStore.Mutable getSchedulerStore() {
+ return this;
+ }
+
+ @Override
+ public CronJobStore.Mutable getCronJobStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore.Mutable getUnsafeTaskStore() {
+ return this;
+ }
+
+ @Override
+ public QuotaStore.Mutable getQuotaStore() {
+ return this;
+ }
+
+ @Override
+ public AttributeStore.Mutable getAttributeStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore getTaskStore() {
+ return this;
+ }
+
+ @Override
+ public JobUpdateStore.Mutable getJobUpdateStore() {
+ return this;
+ }
+
+ @Override
+ public Optional<String> fetchFrameworkId() {
+ return this.schedulerStore.fetchFrameworkId();
+ }
+
+ @Override
+ public Iterable<IJobConfiguration> fetchJobs() {
+ return this.jobStore.fetchJobs();
+ }
+
+ @Override
+ public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
+ return this.jobStore.fetchJob(jobKey);
+ }
+
+ @Override
+ public Optional<IScheduledTask> fetchTask(String taskId) {
+ return this.taskStore.fetchTask(taskId);
+ }
+
+ @Override
+ public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+ return this.taskStore.fetchTasks(query);
+ }
+
+ @Override
+ public Set<IJobKey> getJobKeys() {
+ return this.taskStore.getJobKeys();
+ }
+
+ @Override
+ public Optional<IResourceAggregate> fetchQuota(String role) {
+ return this.quotaStore.fetchQuota(role);
+ }
+
+ @Override
+ public Map<String, IResourceAggregate> fetchQuotas() {
+ return this.quotaStore.fetchQuotas();
+ }
+
+ @Override
+ public Optional<IHostAttributes> getHostAttributes(String host) {
+ return this.attributeStore.getHostAttributes(host);
+ }
+
+ @Override
+ public Set<IHostAttributes> getHostAttributes() {
+ return this.attributeStore.getHostAttributes();
+ }
+
+ @Override
+ public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+ return this.jobUpdateStore.fetchJobUpdates(query);
+ }
+
+ @Override
+ public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
+ return this.jobUpdateStore.fetchJobUpdate(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
new file mode 100644
index 0000000..a0a6b6c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Persistence layer that uses a replicated log.
+ */
+class LogPersistence implements Persistence, DistributedSnapshotStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class);
+
+ private final LogManager logManager;
+ private final SnapshotStore<Snapshot> snapshotStore;
+ private final SchedulingService schedulingService;
+ private final Amount<Long, Time> snapshotInterval;
+ private StreamManager streamManager;
+
+ @Inject
+ LogPersistence(
+ Settings settings,
+ LogManager logManager,
+ SnapshotStore<Snapshot> snapshotStore,
+ ShutdownRegistry shutdownRegistry) {
+
+ this(new ScheduledExecutorSchedulingService(
+ shutdownRegistry,
+ settings.getShutdownGracePeriod()),
+ settings.getSnapshotInterval(),
+ logManager,
+ snapshotStore);
+ }
+
+ @VisibleForTesting
+ LogPersistence(
+ SchedulingService schedulingService,
+ Amount<Long, Time> snapshotInterval,
+ LogManager logManager,
+ SnapshotStore<Snapshot> snapshotStore) {
+
+ this.schedulingService = requireNonNull(schedulingService);
+ this.snapshotInterval = requireNonNull(snapshotInterval);
+ this.logManager = requireNonNull(logManager);
+ this.snapshotStore = requireNonNull(snapshotStore);
+ }
+
+ @Override
+ public void prepare() {
+ // Open the log to make a log replica available to the scheduler group.
+ try {
+ streamManager = logManager.open();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to open the log, cannot continue", e);
+ }
+ }
+
+ @Override
+ public void persist(Stream<Op> mutations) throws PersistenceException {
+ StreamTransaction transaction = streamManager.startTransaction();
+ mutations.forEach(transaction::add);
+ try {
+ transaction.commit();
+ } catch (CodingException e) {
+ throw new PersistenceException(e);
+ }
+ }
+
+ @Override
+ public Stream<Op> recover() throws PersistenceException {
+ scheduleSnapshots();
+
+ try {
+ Iterator<LogEntry> entries = streamManager.readFromBeginning();
+ Iterable<LogEntry> iterableEntries = () -> entries;
+ Stream<LogEntry> entryStream = StreamSupport.stream(iterableEntries.spliterator(), false);
+
+ return entryStream
+ .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP)
+ .filter(entry -> {
+ if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) {
+ Snapshot snapshot = entry.getSnapshot();
+ LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+ snapshotStore.applySnapshot(snapshot);
+ return false;
+ }
+ return true;
+ })
+ .peek(entry -> {
+ if (entry.getSetField() != LogEntry._Fields.TRANSACTION) {
+ throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
+ }
+ })
+ .flatMap(entry -> entry.getTransaction().getOps().stream());
+ } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+ throw new PersistenceException(e);
+ }
+ }
+
+ private void scheduleSnapshots() {
+ if (snapshotInterval.getValue() > 0) {
+ schedulingService.doEvery(snapshotInterval, () -> {
+ try {
+ snapshot();
+ } catch (StorageException e) {
+ if (e.getCause() == null) {
+ LOG.warn("StorageException when attempting to snapshot.", e);
+ } else {
+ LOG.warn(e.getMessage(), e.getCause());
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void snapshot() throws StorageException {
+ try {
+ doSnapshot();
+ } catch (CodingException e) {
+ throw new StorageException("Failed to encode a snapshot", e);
+ } catch (InvalidPositionException e) {
+ throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+ } catch (StreamAccessException e) {
+ throw new StorageException("Failed to create a snapshot", e);
+ }
+ }
+
+ @Timed("scheduler_log_snapshot_persist")
+ @Override
+ public void snapshotWith(Snapshot snapshot)
+ throws CodingException, InvalidPositionException, StreamAccessException {
+
+ streamManager.snapshot(snapshot);
+ }
+
+ /**
+ * Forces a snapshot of the storage state.
+ *
+ * @throws CodingException If there is a problem encoding the snapshot.
+ * @throws InvalidPositionException If the log stream cursor is invalid.
+ * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
+ */
+ @Timed("scheduler_log_snapshot")
+ void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
+ LOG.info("Creating snapshot.");
+ Snapshot snapshot = snapshotStore.createSnapshot();
+ snapshotWith(snapshot);
+ LOG.info("Snapshot complete."
+ + " host attrs: " + snapshot.getHostAttributesSize()
+ + ", cron jobs: " + snapshot.getCronJobsSize()
+ + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+ + ", tasks: " + snapshot.getTasksSize()
+ + ", updates: " + snapshot.getJobUpdateDetailsSize());
+ }
+
+ /**
+ * A service that can schedule an action to be executed periodically.
+ */
+ @VisibleForTesting
+ interface SchedulingService {
+
+ /**
+ * Schedules an action to execute periodically.
+ *
+ * @param interval The time period to wait until running the {@code action} again.
+ * @param action The action to execute periodically.
+ */
+ void doEvery(Amount<Long, Time> interval, Runnable action);
+ }
+
+ private static class ScheduledExecutorSchedulingService implements SchedulingService {
+ private final ScheduledExecutorService scheduledExecutor;
+
+ ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
+ Amount<Long, Time> shutdownGracePeriod) {
+ scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
+ shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
+ scheduledExecutor,
+ shutdownGracePeriod.getValue(),
+ shutdownGracePeriod.getUnit().getTimeUnit()));
+ }
+
+ @Override
+ public void doEvery(Amount<Long, Time> interval, Runnable action) {
+ requireNonNull(interval);
+ requireNonNull(action);
+
+ long delay = interval.getValue();
+ TimeUnit timeUnit = interval.getUnit().getTimeUnit();
+ scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
+ }
+ }
+
+ /**
+ * Configuration settings for log persistence.
+ */
+ public static class Settings {
+ private final Amount<Long, Time> shutdownGracePeriod;
+ private final Amount<Long, Time> snapshotInterval;
+
+ Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
+ this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
+ this.snapshotInterval = requireNonNull(snapshotInterval);
+ }
+
+ public Amount<Long, Time> getShutdownGracePeriod() {
+ return shutdownGracePeriod;
+ }
+
+ public Amount<Long, Time> getSnapshotInterval() {
+ return snapshotInterval;
+ }
+ }
+}
[2/4] aurora git commit: Extract a storage Persistence layer
Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
new file mode 100644
index 0000000..07912b6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -0,0 +1,781 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveJobUpdates;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DurableStorageTest extends EasyMockTest {
+
+ private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
+ private static final IJobUpdateKey UPDATE_ID =
+ IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId"));
+
+ private DurableStorage durableStorage;
+ private Persistence persistence;
+ private StorageTestUtil storageUtil;
+ private EventSink eventSink;
+
+ @Before
+ public void setUp() {
+ persistence = createMock(Persistence.class);
+ storageUtil = new StorageTestUtil(this);
+ eventSink = createMock(EventSink.class);
+
+ durableStorage = new DurableStorage(
+ persistence,
+ storageUtil.storage,
+ storageUtil.schedulerStore,
+ storageUtil.jobStore,
+ storageUtil.taskStore,
+ storageUtil.quotaStore,
+ storageUtil.attributeStore,
+ storageUtil.jobUpdateStore,
+ eventSink,
+ new ReentrantLock(),
+ TaskTestUtil.THRIFT_BACKFILL);
+
+ storageUtil.storage.prepare();
+ }
+
+ @Test
+ public void testStart() throws Exception {
+ // We should initialize persistence.
+ persistence.prepare();
+
+ // Our start should recover persistence and then forward to the underlying storage start of the
+ // supplied initialization logic.
+ AtomicBoolean initialized = new AtomicBoolean(false);
+ MutateWork.NoResult.Quiet initializationLogic = provider -> {
+ // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+ // reason.
+ initialized.set(true);
+ };
+
+ Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+ storageUtil.storage.write(capture(recoverAndInitializeWork));
+ expectLastCall().andAnswer(() -> {
+ recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
+ expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
+ () -> {
+ initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ // Populate all Op types.
+ buildReplayOps();
+
+ control.replay();
+
+ durableStorage.prepare();
+ durableStorage.start(initializationLogic);
+ assertTrue(initialized.get());
+
+ // Assert all Transaction types have handlers defined.
+ assertEquals(
+ EnumSet.allOf(Op._Fields.class),
+ EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet()));
+ }
+
+ private void buildReplayOps() throws Exception {
+ ImmutableSet.Builder<Op> builder = ImmutableSet.builder();
+
+ builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob")));
+ storageUtil.schedulerStore.saveFrameworkId("bob");
+
+ JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
+ JobConfiguration expectedJob =
+ new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
+ SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
+ builder.add(Op.saveCronJob(cronJob));
+ storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
+
+ RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
+ builder.add(Op.removeJob(removeJob));
+ storageUtil.jobStore.removeJob(JOB_KEY);
+
+ ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
+ actualTask.getAssignedTask().setTask(nonBackfilledConfig());
+ IScheduledTask expectedTask = makeTask("id", JOB_KEY);
+ SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
+ builder.add(Op.saveTasks(saveTasks));
+ storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
+
+ RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
+ builder.add(Op.removeTasks(removeTasks));
+ storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
+
+ ResourceAggregate nonBackfilled = new ResourceAggregate()
+ .setNumCpus(1.0)
+ .setRamMb(32)
+ .setDiskMb(64);
+ SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
+ builder.add(Op.saveQuota(saveQuota));
+ storageUtil.quotaStore.saveQuota(
+ saveQuota.getRole(),
+ IResourceAggregate.build(nonBackfilled.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
+
+ builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())));
+ storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
+
+ // This entry lacks a slave ID, and should therefore be discarded.
+ SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
+ .setHost("host1")
+ .setMode(MaintenanceMode.DRAINED));
+ builder.add(Op.saveHostAttributes(hostAttributes1));
+
+ SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
+ .setHost("host2")
+ .setSlaveId("slave2")
+ .setMode(MaintenanceMode.DRAINED));
+ builder.add(Op.saveHostAttributes(hostAttributes2));
+ expect(storageUtil.attributeStore.saveHostAttributes(
+ IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
+
+ JobUpdate actualUpdate = new JobUpdate()
+ .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
+ .setInstructions(new JobUpdateInstructions()
+ .setInitialState(
+ ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
+ .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
+ JobUpdate expectedUpdate = actualUpdate.deepCopy();
+ expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
+ expectedUpdate.getInstructions().getInitialState()
+ .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
+ SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
+ builder.add(Op.saveJobUpdate(saveUpdate));
+ storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
+
+ SaveJobUpdateEvent saveUpdateEvent =
+ new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
+ builder.add(Op.saveJobUpdateEvent(saveUpdateEvent));
+ storageUtil.jobUpdateStore.saveJobUpdateEvent(
+ UPDATE_ID,
+ IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
+
+ SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
+ new JobInstanceUpdateEvent(),
+ UPDATE_ID.newBuilder());
+ builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent));
+ storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+ UPDATE_ID,
+ IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
+
+ builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)));
+ // No expectation - this op is ignored.
+
+ builder.add(Op.removeJobUpdate(
+ new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))));
+ storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
+
+ expect(persistence.recover()).andReturn(builder.build().stream());
+ }
+
+ private TaskConfig nonBackfilledConfig() {
+ // When more fields have to be backfilled
+ // modify this method.
+ return makeConfig(JOB_KEY).newBuilder();
+ }
+
+ abstract class AbstractStorageFixture {
+ private final AtomicBoolean runCalled = new AtomicBoolean(false);
+
+ AbstractStorageFixture() {
+ // Prevent otherwise silent noop tests that forget to call run().
+ addTearDown(new TearDown() {
+ @Override
+ public void tearDown() {
+ assertTrue(runCalled.get());
+ }
+ });
+ }
+
+ void run() throws Exception {
+ runCalled.set(true);
+
+ // Expect basic start operations.
+
+ // Initialize persistence.
+ persistence.prepare();
+
+ // Replay the ops and perform and supplied initializationWork.
+ // Simulate NOOP initialization work
+ // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+ // reason.
+ MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
+ // No-op.
+ };
+
+ Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+ storageUtil.storage.write(capture(recoverAndInitializeWork));
+ expectLastCall().andAnswer(() -> {
+ recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ expect(persistence.recover()).andReturn(Stream.empty());
+ Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
+ expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
+ () -> {
+ recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ // Setup custom test expectations.
+ setupExpectations();
+
+ control.replay();
+
+ // Start the system.
+ durableStorage.prepare();
+ durableStorage.start(initializationLogic);
+
+ // Exercise the system.
+ runTest();
+ }
+
+ protected void setupExpectations() throws Exception {
+ // Default to no expectations.
+ }
+
+ protected abstract void runTest();
+ }
+
+ abstract class AbstractMutationFixture extends AbstractStorageFixture {
+ @Override
+ protected void runTest() {
+ durableStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
+ }
+
+ protected abstract void performMutations(MutableStoreProvider storeProvider);
+ }
+
+ private void expectPersist(Op op, Op... ops) {
+ try {
+ // Workaround for comparing streams.
+ persistence.persist(anyObject());
+ expectLastCall().andAnswer((IAnswer<Void>) () -> {
+ assertEquals(
+ ImmutableList.<Op>builder().add(op).add(ops).build(),
+ ((Stream<Op>) EasyMock.getCurrentArguments()[0]).collect(Collectors.toList()));
+
+ return null;
+ });
+ } catch (Persistence.PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testSaveFrameworkId() throws Exception {
+ String frameworkId = "bob";
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.schedulerStore.saveFrameworkId(frameworkId);
+ expectPersist(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveAcceptedJob() throws Exception {
+ IJobConfiguration jobConfig =
+ IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder()));
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobStore.saveAcceptedJob(jobConfig);
+ expectPersist(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveJob() throws Exception {
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobStore.removeJob(JOB_KEY);
+ expectPersist(Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getCronJobStore().removeJob(JOB_KEY);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveTasks() throws Exception {
+ Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.saveTasks(tasks);
+ expectPersist(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(tasks);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testMutateTasks() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+ expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testNestedTransactions() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+ ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+ storageUtil.taskStore.deleteTasks(tasksToRemove);
+
+ expectPersist(
+ Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
+ Op.removeTasks(new RemoveTasks(tasksToRemove)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+
+ durableStorage.write((NoResult.Quiet)
+ innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveAndMutateTasks() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.saveTasks(saved);
+
+ // Nested transaction with result.
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+ // Resulting stream operation.
+ expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(saved);
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.saveTasks(saved);
+
+ // Nested transaction with result.
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+ // Resulting stream operation.
+ expectPersist(Op.saveTasks(new SaveTasks(
+ ImmutableSet.<ScheduledTask>builder()
+ .addAll(IScheduledTask.toBuildersList(saved))
+ .add(mutated.get().newBuilder())
+ .build())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(saved);
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveTasksQuery() throws Exception {
+ IScheduledTask task = task("a", ScheduleStatus.FINISHED);
+ Set<String> taskIds = Tasks.ids(task);
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.deleteTasks(taskIds);
+ expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveTasksIds() throws Exception {
+ Set<String> taskIds = ImmutableSet.of("42");
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.deleteTasks(taskIds);
+ expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveQuota() throws Exception {
+ String role = "role";
+ IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.quotaStore.saveQuota(role, quota);
+ expectPersist(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().saveQuota(role, quota);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveQuota() throws Exception {
+ String role = "role";
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.quotaStore.removeQuota(role);
+ expectPersist(Op.removeQuota(new RemoveQuota(role)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().removeQuota(role);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveHostAttributes() throws Exception {
+ String host = "hostname";
+ Set<Attribute> attributes =
+ ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value")));
+ Optional<IHostAttributes> hostAttributes = Optional.of(
+ IHostAttributes.build(new HostAttributes()
+ .setHost(host)
+ .setAttributes(attributes)));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ expect(storageUtil.attributeStore.getHostAttributes(host))
+ .andReturn(Optional.absent());
+
+ expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+
+ expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
+ eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
+ expectPersist(
+ Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())));
+
+ expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
+ .andReturn(false);
+
+ expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ AttributeStore.Mutable store = storeProvider.getAttributeStore();
+ assertEquals(Optional.absent(), store.getHostAttributes(host));
+
+ assertTrue(store.saveHostAttributes(hostAttributes.get()));
+
+ assertEquals(hostAttributes, store.getHostAttributes(host));
+
+ assertFalse(store.saveHostAttributes(hostAttributes.get()));
+
+ assertEquals(hostAttributes, store.getHostAttributes(host));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveUpdate() throws Exception {
+ IJobUpdate update = IJobUpdate.build(new JobUpdate()
+ .setSummary(new JobUpdateSummary()
+ .setKey(UPDATE_ID.newBuilder())
+ .setUser("user"))
+ .setInstructions(new JobUpdateInstructions()
+ .setDesiredState(new InstanceTaskConfig()
+ .setTask(new TaskConfig())
+ .setInstances(ImmutableSet.of(new Range(0, 3))))
+ .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+ .setTask(new TaskConfig())
+ .setInstances(ImmutableSet.of(new Range(0, 3)))))
+ .setSettings(new JobUpdateSettings())));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.saveJobUpdate(update);
+ expectPersist(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobUpdate(update);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveJobUpdateEvent() throws Exception {
+ IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
+ .setStatus(JobUpdateStatus.ROLLING_BACK)
+ .setTimestampMs(12345L));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
+ expectPersist(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
+ event.newBuilder(),
+ UPDATE_ID.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveJobInstanceUpdateEvent() throws Exception {
+ IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent()
+ .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
+ .setTimestampMs(12345L)
+ .setInstanceId(0));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event);
+ expectPersist(Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent(
+ event.newBuilder(),
+ UPDATE_ID.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveJobUpdates() throws Exception {
+ IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
+ .setJob(JOB_KEY.newBuilder())
+ .setId("update-id"));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
+
+ // No transaction is generated since this version is currently in 'read-only'
+ // compatibility mode for this operation type.
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
+ }
+ }.run();
+ }
+
+ private static IScheduledTask task(String id, ScheduleStatus status) {
+ return IScheduledTask.build(new ScheduledTask()
+ .setStatus(status)
+ .setAssignedTask(new AssignedTask()
+ .setTaskId(id)
+ .setTask(new TaskConfig())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
new file mode 100644
index 0000000..219576b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class ThriftBackfillTest extends EasyMockTest {
+
+ private ThriftBackfill thriftBackfill;
+ private TierManager tierManager;
+
+ @Before
+ public void setUp() {
+ tierManager = createMock(TierManager.class);
+ thriftBackfill = new ThriftBackfill(tierManager);
+ }
+
+ @Test
+ public void testFieldsToSetNoPorts() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)))
+ .setProduction(false)
+ .setTier("tierName");
+ TaskConfig expected = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
+
+ control.replay();
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testResourceAggregateFieldsToSet() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setNumCpus(1.0)
+ .setRamMb(32)
+ .setDiskMb(64);
+
+ IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
+
+ assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+ }
+
+ @Test
+ public void testResourceAggregateSetToFields() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+ .setNumCpus(1.0)
+ .setRamMb(32)
+ .setDiskMb(64));
+
+ assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testResourceAggregateTooManyResources() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0)));
+ ThriftBackfill.backfillResourceAggregate(aggregate);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testResourceAggregateInvalidResources() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http")));
+ ThriftBackfill.backfillResourceAggregate(aggregate);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testResourceAggregateMissingResources() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
+ ThriftBackfill.backfillResourceAggregate(aggregate);
+ }
+
+ @Test
+ public void testBackfillTierProduction() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+ .setProduction(true)
+ .setTier("tierName");
+ TaskConfig expected = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
+
+ control.replay();
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testBackfillTierNotProduction() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+ .setProduction(true)
+ .setTier("tierName");
+ TaskConfig configWithBackfilledResources = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
+ .andReturn(TaskTestUtil.DEV_TIER);
+
+ control.replay();
+
+ TaskConfig expected = configWithBackfilledResources.deepCopy()
+ .setProduction(false);
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testBackfillTierSetsTierToPreemptible() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)));
+ TaskConfig configWithBackfilledResources = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+ control.replay();
+
+ TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible");
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testBackfillTierSetsTierToPreferred() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)))
+ .setProduction(true);
+ TaskConfig configWithBackfilledResources = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+ control.replay();
+
+ TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred");
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBackfillTierBadTierConfiguration() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)));
+
+ expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
+
+ control.replay();
+
+ thriftBackfill.backfillTask(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
new file mode 100644
index 0000000..cbad3eb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionRecorderTest {
+ @Test
+ public void testCoalesce() throws Exception {
+ // No coalescing - different operation types.
+ assertEquals(
+ ImmutableList.of(
+ Op.removeTasks(createRemoveTasks("1", "2")),
+ Op.saveTasks(createSaveTasks("4", "5"))),
+ record(
+ Op.removeTasks(createRemoveTasks("1", "2")),
+ Op.saveTasks(createSaveTasks("4", "5"))));
+
+ assertEquals(
+ ImmutableList.of(Op.removeTasks(createRemoveTasks("1", "2", "3", "4"))),
+ record(
+ Op.removeTasks(createRemoveTasks("1", "2")),
+ Op.removeTasks(createRemoveTasks("3", "4"))));
+
+ assertEquals(
+ ImmutableList.of(Op.saveTasks(createSaveTasks("3", "2", "1"))),
+ record(Op.saveTasks(createSaveTasks("1", "2")), Op.saveTasks(createSaveTasks("1", "3"))));
+
+ assertEquals(
+ ImmutableList.of(Op.removeTasks(createRemoveTasks("3", "4", "5"))),
+ record(
+ Op.removeTasks(createRemoveTasks("3")),
+ Op.removeTasks(createRemoveTasks("4", "5"))));
+ }
+
+ private static List<Op> record(Op... ops) {
+ TransactionRecorder recorder = new TransactionRecorder();
+ Stream.of(ops).forEach(recorder::add);
+ return recorder.getOps();
+ }
+
+ private static SaveTasks createSaveTasks(String... taskIds) {
+ return new SaveTasks().setTasks(
+ Stream.of(taskIds)
+ .map(id -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(id)))
+ .collect(Collectors.toSet())
+ );
+ }
+
+ private RemoveTasks createRemoveTasks(String... taskIds) {
+ return new RemoveTasks(ImmutableSet.copyOf(taskIds));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
new file mode 100644
index 0000000..e8b564b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WriteAheadStorageTest extends EasyMockTest {
+
+ private TransactionManager transactionManager;
+ private TaskStore.Mutable taskStore;
+ private AttributeStore.Mutable attributeStore;
+ private JobUpdateStore.Mutable jobUpdateStore;
+ private EventSink eventSink;
+ private WriteAheadStorage storage;
+
+ @Before
+ public void setUp() {
+ transactionManager = createMock(TransactionManager.class);
+ taskStore = createMock(TaskStore.Mutable.class);
+ attributeStore = createMock(AttributeStore.Mutable.class);
+ jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
+ eventSink = createMock(EventSink.class);
+
+ storage = new WriteAheadStorage(
+ transactionManager,
+ createMock(SchedulerStore.Mutable.class),
+ createMock(CronJobStore.Mutable.class),
+ taskStore,
+ createMock(QuotaStore.Mutable.class),
+ attributeStore,
+ jobUpdateStore,
+ LoggerFactory.getLogger(WriteAheadStorageTest.class),
+ eventSink);
+ }
+
+ private void expectOp(Op op) {
+ expect(transactionManager.hasActiveTransaction()).andReturn(true);
+ transactionManager.log(op);
+ }
+
+ @Test
+ public void testRemoveUpdates() {
+ Set<IJobUpdateKey> removed = ImmutableSet.of(
+ IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
+ IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
+ jobUpdateStore.removeJobUpdates(removed);
+ // No operation is written since this Op is in read-only compatibility mode.
+
+ control.replay();
+
+ storage.removeJobUpdates(removed);
+ }
+
+ @Test
+ public void testMutate() {
+ String taskId = "a";
+ Function<IScheduledTask, IScheduledTask> mutator =
+ createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
+ Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
+
+ expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
+ expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+ control.replay();
+
+ assertEquals(mutated, storage.mutateTask(taskId, mutator));
+ }
+
+ @Test
+ public void testSaveHostAttributes() {
+ IHostAttributes attributes = IHostAttributes.build(
+ new HostAttributes()
+ .setHost("a")
+ .setMode(MaintenanceMode.DRAINING)
+ .setAttributes(ImmutableSet.of(
+ new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+ expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+ expectOp(Op.saveHostAttributes(
+ new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+ eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+ expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+ control.replay();
+
+ assertTrue(storage.saveHostAttributes(attributes));
+
+ assertFalse(storage.saveHostAttributes(attributes));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteAllTasks() {
+ control.replay();
+ storage.deleteAllTasks();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteHostAttributes() {
+ control.replay();
+ storage.deleteHostAttributes();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteJobs() {
+ control.replay();
+ storage.deleteJobs();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteQuotas() {
+ control.replay();
+ storage.deleteQuotas();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteAllUpdatesAndEvents() {
+ control.replay();
+ storage.deleteAllUpdates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 3f44559..cb38f10 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Consumer;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
@@ -37,10 +36,8 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
import org.apache.aurora.gen.storage.Frame;
import org.apache.aurora.gen.storage.FrameChunk;
@@ -48,9 +45,7 @@ import org.apache.aurora.gen.storage.FrameHeader;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveTasks;
import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
@@ -112,11 +107,11 @@ public class LogManagerTest extends EasyMockTest {
public void testStreamManagerReadFromUnknownNone() throws CodingException {
expect(stream.readAll()).andReturn(Collections.emptyIterator());
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-
control.replay();
- createNoMessagesStreamManager().readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(),
+ ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
}
@Test
@@ -127,12 +122,11 @@ public class LogManagerTest extends EasyMockTest {
expect(entry1.contents()).andReturn(encode(transaction1));
expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1));
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
- reader.accept(transaction1);
-
control.replay();
- createNoMessagesStreamManager().readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(transaction1),
+ ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
}
@Test
@@ -214,50 +208,6 @@ public class LogManagerTest extends EasyMockTest {
}
@Test
- public void testCoalesce() throws CodingException {
- SaveTasks saveTasks1 = createSaveTasks("1", "2");
- createSaveTasks("2");
- SaveTasks saveTasks2 = createSaveTasks("1", "3");
- SaveTasks saveTasks3 = createSaveTasks("4", "5");
-
- // saveTasks1 is unrepresented because both of its operations were trumped.
- // saveTasks3 is unrepresented because its operations were deleted.
- SaveTasks coalescedSaves = createSaveTasks("3", "2", "1");
-
- RemoveTasks removeTasks1 = createRemoveTasks("1", "2");
- RemoveTasks removeTasks2 = createRemoveTasks("3");
- RemoveTasks removeTasks3 = createRemoveTasks("4", "5");
-
- RemoveTasks coalescedRemoves =
- new RemoveTasks(ImmutableSet.copyOf(Iterables.concat(removeTasks2.getTaskIds(),
- removeTasks3.getTaskIds())));
-
- expectAppend(position1,
- createLogEntry(
- Op.saveTasks(coalescedSaves),
- Op.removeTasks(removeTasks1),
- Op.saveTasks(saveTasks3),
- Op.removeTasks(coalescedRemoves)));
-
- control.replay();
-
- StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
-
- // The next 2 saves should coalesce
- streamTransaction.add(Op.saveTasks(saveTasks1));
- streamTransaction.add(Op.saveTasks(saveTasks2));
-
- streamTransaction.add(Op.removeTasks(removeTasks1));
- streamTransaction.add(Op.saveTasks(saveTasks3));
-
- // The next 2 removes should coalesce
- streamTransaction.add(Op.removeTasks(removeTasks2));
- streamTransaction.add(Op.removeTasks(removeTasks3));
-
- assertEquals(position1, streamTransaction.commit());
- }
-
- @Test
public void testTransactionSnapshot() throws CodingException {
Snapshot snapshot = createSnapshot();
DeduplicatedSnapshot deduplicated = new SnapshotDeduplicatorImpl().deduplicate(snapshot);
@@ -469,14 +419,12 @@ public class LogManagerTest extends EasyMockTest {
expect(stream.readAll()).andReturn(entries.iterator());
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
- reader.accept(transaction1);
- reader.accept(transaction2);
-
StreamManager streamManager = createStreamManager(message.chunkSize);
control.replay();
- streamManager.readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(transaction1, transaction2),
+ ImmutableList.copyOf(streamManager.readFromBeginning()));
}
@Test
@@ -494,9 +442,6 @@ public class LogManagerTest extends EasyMockTest {
expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator());
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
- reader.accept(snapshotLogEntry);
-
control.replay();
HashFunction md5 = Hashing.md5();
@@ -506,7 +451,9 @@ public class LogManagerTest extends EasyMockTest {
md5,
new SnapshotDeduplicatorImpl());
streamManager.snapshot(snapshot);
- streamManager.readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(snapshotLogEntry),
+ ImmutableList.copyOf(streamManager.readFromBeginning()));
}
private Snapshot createSnapshot() {
@@ -517,15 +464,6 @@ public class LogManagerTest extends EasyMockTest {
.setTasks(ImmutableSet.of(TaskTestUtil.makeTask("task_id", TaskTestUtil.JOB).newBuilder()));
}
- private SaveTasks createSaveTasks(String... taskIds) {
- return new SaveTasks(ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(taskIds),
- taskId -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(taskId)))));
- }
-
- private RemoveTasks createRemoveTasks(String... taskIds) {
- return new RemoveTasks(ImmutableSet.copyOf(taskIds));
- }
-
private void expectFrames(Position position, Message message) throws CodingException {
expect(stream.append(entryEq(message.header))).andReturn(position);
for (LogEntry chunk : message.chunks) {