You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:11 UTC
[2/4] aurora git commit: Extract a storage Persistence layer
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
new file mode 100644
index 0000000..07912b6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -0,0 +1,781 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveJobUpdates;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DurableStorageTest extends EasyMockTest {
+
+ private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
+ private static final IJobUpdateKey UPDATE_ID =
+ IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId"));
+
+ private DurableStorage durableStorage;
+ private Persistence persistence;
+ private StorageTestUtil storageUtil;
+ private EventSink eventSink;
+
+ @Before
+ public void setUp() {
+ persistence = createMock(Persistence.class);
+ storageUtil = new StorageTestUtil(this);
+ eventSink = createMock(EventSink.class);
+
+ durableStorage = new DurableStorage(
+ persistence,
+ storageUtil.storage,
+ storageUtil.schedulerStore,
+ storageUtil.jobStore,
+ storageUtil.taskStore,
+ storageUtil.quotaStore,
+ storageUtil.attributeStore,
+ storageUtil.jobUpdateStore,
+ eventSink,
+ new ReentrantLock(),
+ TaskTestUtil.THRIFT_BACKFILL);
+
+ storageUtil.storage.prepare();
+ }
+
+ @Test
+ public void testStart() throws Exception {
+ // We should initialize persistence.
+ persistence.prepare();
+
+ // Our start should recover persistence and then forward to the underlying storage start of the
+ // supplied initialization logic.
+ AtomicBoolean initialized = new AtomicBoolean(false);
+ MutateWork.NoResult.Quiet initializationLogic = provider -> {
+ // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+ // reason.
+ initialized.set(true);
+ };
+
+ Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+ storageUtil.storage.write(capture(recoverAndInitializeWork));
+ expectLastCall().andAnswer(() -> {
+ recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture();
+ expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
+ () -> {
+ initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ // Populate all Op types.
+ buildReplayOps();
+
+ control.replay();
+
+ durableStorage.prepare();
+ durableStorage.start(initializationLogic);
+ assertTrue(initialized.get());
+
+ // Assert all Transaction types have handlers defined.
+ assertEquals(
+ EnumSet.allOf(Op._Fields.class),
+ EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet()));
+ }
+
+ private void buildReplayOps() throws Exception {
+ ImmutableSet.Builder<Op> builder = ImmutableSet.builder();
+
+ builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob")));
+ storageUtil.schedulerStore.saveFrameworkId("bob");
+
+ JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
+ JobConfiguration expectedJob =
+ new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
+ SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
+ builder.add(Op.saveCronJob(cronJob));
+ storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
+
+ RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
+ builder.add(Op.removeJob(removeJob));
+ storageUtil.jobStore.removeJob(JOB_KEY);
+
+ ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
+ actualTask.getAssignedTask().setTask(nonBackfilledConfig());
+ IScheduledTask expectedTask = makeTask("id", JOB_KEY);
+ SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
+ builder.add(Op.saveTasks(saveTasks));
+ storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
+
+ RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
+ builder.add(Op.removeTasks(removeTasks));
+ storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
+
+ ResourceAggregate nonBackfilled = new ResourceAggregate()
+ .setNumCpus(1.0)
+ .setRamMb(32)
+ .setDiskMb(64);
+ SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
+ builder.add(Op.saveQuota(saveQuota));
+ storageUtil.quotaStore.saveQuota(
+ saveQuota.getRole(),
+ IResourceAggregate.build(nonBackfilled.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
+
+ builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())));
+ storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
+
+ // This entry lacks a slave ID, and should therefore be discarded.
+ SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
+ .setHost("host1")
+ .setMode(MaintenanceMode.DRAINED));
+ builder.add(Op.saveHostAttributes(hostAttributes1));
+
+ SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
+ .setHost("host2")
+ .setSlaveId("slave2")
+ .setMode(MaintenanceMode.DRAINED));
+ builder.add(Op.saveHostAttributes(hostAttributes2));
+ expect(storageUtil.attributeStore.saveHostAttributes(
+ IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
+
+ JobUpdate actualUpdate = new JobUpdate()
+ .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
+ .setInstructions(new JobUpdateInstructions()
+ .setInitialState(
+ ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
+ .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
+ JobUpdate expectedUpdate = actualUpdate.deepCopy();
+ expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
+ expectedUpdate.getInstructions().getInitialState()
+ .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
+ SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
+ builder.add(Op.saveJobUpdate(saveUpdate));
+ storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
+
+ SaveJobUpdateEvent saveUpdateEvent =
+ new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
+ builder.add(Op.saveJobUpdateEvent(saveUpdateEvent));
+ storageUtil.jobUpdateStore.saveJobUpdateEvent(
+ UPDATE_ID,
+ IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
+
+ SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
+ new JobInstanceUpdateEvent(),
+ UPDATE_ID.newBuilder());
+ builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent));
+ storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
+ UPDATE_ID,
+ IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
+
+ builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)));
+ // No expectation - this op is ignored.
+
+ builder.add(Op.removeJobUpdate(
+ new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))));
+ storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
+
+ expect(persistence.recover()).andReturn(builder.build().stream());
+ }
+
+ private TaskConfig nonBackfilledConfig() {
+ // When more fields have to be backfilled
+ // modify this method.
+ return makeConfig(JOB_KEY).newBuilder();
+ }
+
+ abstract class AbstractStorageFixture {
+ private final AtomicBoolean runCalled = new AtomicBoolean(false);
+
+ AbstractStorageFixture() {
+ // Prevent otherwise silent noop tests that forget to call run().
+ addTearDown(new TearDown() {
+ @Override
+ public void tearDown() {
+ assertTrue(runCalled.get());
+ }
+ });
+ }
+
+ void run() throws Exception {
+ runCalled.set(true);
+
+ // Expect basic start operations.
+
+ // Initialize persistence.
+ persistence.prepare();
+
+ // Replay the ops and perform and supplied initializationWork.
+ // Simulate NOOP initialization work
+ // Creating a mock and expecting apply(storeProvider) does not work here for whatever
+ // reason.
+ MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
+ // No-op.
+ };
+
+ Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture();
+ storageUtil.storage.write(capture(recoverAndInitializeWork));
+ expectLastCall().andAnswer(() -> {
+ recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ expect(persistence.recover()).andReturn(Stream.empty());
+ Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
+ expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
+ () -> {
+ recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
+ return null;
+ });
+
+ // Setup custom test expectations.
+ setupExpectations();
+
+ control.replay();
+
+ // Start the system.
+ durableStorage.prepare();
+ durableStorage.start(initializationLogic);
+
+ // Exercise the system.
+ runTest();
+ }
+
+ protected void setupExpectations() throws Exception {
+ // Default to no expectations.
+ }
+
+ protected abstract void runTest();
+ }
+
+ abstract class AbstractMutationFixture extends AbstractStorageFixture {
+ @Override
+ protected void runTest() {
+ durableStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
+ }
+
+ protected abstract void performMutations(MutableStoreProvider storeProvider);
+ }
+
+ private void expectPersist(Op op, Op... ops) {
+ try {
+ // Workaround for comparing streams.
+ persistence.persist(anyObject());
+ expectLastCall().andAnswer((IAnswer<Void>) () -> {
+ assertEquals(
+ ImmutableList.<Op>builder().add(op).add(ops).build(),
+ ((Stream<Op>) EasyMock.getCurrentArguments()[0]).collect(Collectors.toList()));
+
+ return null;
+ });
+ } catch (Persistence.PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testSaveFrameworkId() throws Exception {
+ String frameworkId = "bob";
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.schedulerStore.saveFrameworkId(frameworkId);
+ expectPersist(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveAcceptedJob() throws Exception {
+ IJobConfiguration jobConfig =
+ IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder()));
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobStore.saveAcceptedJob(jobConfig);
+ expectPersist(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveJob() throws Exception {
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobStore.removeJob(JOB_KEY);
+ expectPersist(Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getCronJobStore().removeJob(JOB_KEY);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveTasks() throws Exception {
+ Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.saveTasks(tasks);
+ expectPersist(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(tasks);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testMutateTasks() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+ expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testNestedTransactions() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
+ ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+ storageUtil.taskStore.deleteTasks(tasksToRemove);
+
+ expectPersist(
+ Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
+ Op.removeTasks(new RemoveTasks(tasksToRemove)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+
+ durableStorage.write((NoResult.Quiet)
+ innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveAndMutateTasks() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.saveTasks(saved);
+
+ // Nested transaction with result.
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+ // Resulting stream operation.
+ expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(saved);
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
+ String taskId = "fred";
+ Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
+ Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.saveTasks(saved);
+
+ // Nested transaction with result.
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
+
+ // Resulting stream operation.
+ expectPersist(Op.saveTasks(new SaveTasks(
+ ImmutableSet.<ScheduledTask>builder()
+ .addAll(IScheduledTask.toBuildersList(saved))
+ .add(mutated.get().newBuilder())
+ .build())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(saved);
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveTasksQuery() throws Exception {
+ IScheduledTask task = task("a", ScheduleStatus.FINISHED);
+ Set<String> taskIds = Tasks.ids(task);
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.deleteTasks(taskIds);
+ expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveTasksIds() throws Exception {
+ Set<String> taskIds = ImmutableSet.of("42");
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.taskStore.deleteTasks(taskIds);
+ expectPersist(Op.removeTasks(new RemoveTasks(taskIds)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveQuota() throws Exception {
+ String role = "role";
+ IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.quotaStore.saveQuota(role, quota);
+ expectPersist(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().saveQuota(role, quota);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveQuota() throws Exception {
+ String role = "role";
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.quotaStore.removeQuota(role);
+ expectPersist(Op.removeQuota(new RemoveQuota(role)));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().removeQuota(role);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveHostAttributes() throws Exception {
+ String host = "hostname";
+ Set<Attribute> attributes =
+ ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value")));
+ Optional<IHostAttributes> hostAttributes = Optional.of(
+ IHostAttributes.build(new HostAttributes()
+ .setHost(host)
+ .setAttributes(attributes)));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ expect(storageUtil.attributeStore.getHostAttributes(host))
+ .andReturn(Optional.absent());
+
+ expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+
+ expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
+ eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
+ expectPersist(
+ Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())));
+
+ expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
+ .andReturn(false);
+
+ expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ AttributeStore.Mutable store = storeProvider.getAttributeStore();
+ assertEquals(Optional.absent(), store.getHostAttributes(host));
+
+ assertTrue(store.saveHostAttributes(hostAttributes.get()));
+
+ assertEquals(hostAttributes, store.getHostAttributes(host));
+
+ assertFalse(store.saveHostAttributes(hostAttributes.get()));
+
+ assertEquals(hostAttributes, store.getHostAttributes(host));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveUpdate() throws Exception {
+ IJobUpdate update = IJobUpdate.build(new JobUpdate()
+ .setSummary(new JobUpdateSummary()
+ .setKey(UPDATE_ID.newBuilder())
+ .setUser("user"))
+ .setInstructions(new JobUpdateInstructions()
+ .setDesiredState(new InstanceTaskConfig()
+ .setTask(new TaskConfig())
+ .setInstances(ImmutableSet.of(new Range(0, 3))))
+ .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+ .setTask(new TaskConfig())
+ .setInstances(ImmutableSet.of(new Range(0, 3)))))
+ .setSettings(new JobUpdateSettings())));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.saveJobUpdate(update);
+ expectPersist(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobUpdate(update);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveJobUpdateEvent() throws Exception {
+ IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
+ .setStatus(JobUpdateStatus.ROLLING_BACK)
+ .setTimestampMs(12345L));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
+ expectPersist(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(
+ event.newBuilder(),
+ UPDATE_ID.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testSaveJobInstanceUpdateEvent() throws Exception {
+ IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent()
+ .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
+ .setTimestampMs(12345L)
+ .setInstanceId(0));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event);
+ expectPersist(Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent(
+ event.newBuilder(),
+ UPDATE_ID.newBuilder())));
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testRemoveJobUpdates() throws Exception {
+ IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
+ .setJob(JOB_KEY.newBuilder())
+ .setId("update-id"));
+
+ new AbstractMutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWrite();
+ storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
+
+ // No transaction is generated since this version is currently in 'read-only'
+ // compatibility mode for this operation type.
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
+ }
+ }.run();
+ }
+
+ private static IScheduledTask task(String id, ScheduleStatus status) {
+ return IScheduledTask.build(new ScheduledTask()
+ .setStatus(status)
+ .setAssignedTask(new AssignedTask()
+ .setTaskId(id)
+ .setTask(new TaskConfig())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
new file mode 100644
index 0000000..219576b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfillTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class ThriftBackfillTest extends EasyMockTest {
+
+ private ThriftBackfill thriftBackfill;
+ private TierManager tierManager;
+
+ @Before
+ public void setUp() {
+ tierManager = createMock(TierManager.class);
+ thriftBackfill = new ThriftBackfill(tierManager);
+ }
+
+ @Test
+ public void testFieldsToSetNoPorts() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)))
+ .setProduction(false)
+ .setTier("tierName");
+ TaskConfig expected = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
+
+ control.replay();
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testResourceAggregateFieldsToSet() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setNumCpus(1.0)
+ .setRamMb(32)
+ .setDiskMb(64);
+
+ IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
+
+ assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+ }
+
+ @Test
+ public void testResourceAggregateSetToFields() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
+ .setNumCpus(1.0)
+ .setRamMb(32)
+ .setDiskMb(64));
+
+ assertEquals(expected, ThriftBackfill.backfillResourceAggregate(aggregate));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testResourceAggregateTooManyResources() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), numCpus(2.0)));
+ ThriftBackfill.backfillResourceAggregate(aggregate);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testResourceAggregateInvalidResources() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), namedPort("http")));
+ ThriftBackfill.backfillResourceAggregate(aggregate);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testResourceAggregateMissingResources() {
+ control.replay();
+
+ ResourceAggregate aggregate = new ResourceAggregate()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
+ ThriftBackfill.backfillResourceAggregate(aggregate);
+ }
+
+ @Test
+ public void testBackfillTierProduction() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+ .setProduction(true)
+ .setTier("tierName");
+ TaskConfig expected = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
+
+ control.replay();
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testBackfillTierNotProduction() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
+ .setProduction(true)
+ .setTier("tierName");
+ TaskConfig configWithBackfilledResources = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
+ .andReturn(TaskTestUtil.DEV_TIER);
+
+ control.replay();
+
+ TaskConfig expected = configWithBackfilledResources.deepCopy()
+ .setProduction(false);
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testBackfillTierSetsTierToPreemptible() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)));
+ TaskConfig configWithBackfilledResources = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+ control.replay();
+
+ TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preemptible");
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test
+ public void testBackfillTierSetsTierToPreferred() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)))
+ .setProduction(true);
+ TaskConfig configWithBackfilledResources = config.deepCopy()
+ .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
+
+ expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
+
+ control.replay();
+
+ TaskConfig expected = configWithBackfilledResources.deepCopy().setTier("preferred");
+
+ assertEquals(
+ expected,
+ thriftBackfill.backfillTask(config));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBackfillTierBadTierConfiguration() {
+ TaskConfig config = new TaskConfig()
+ .setResources(ImmutableSet.of(
+ numCpus(1.0),
+ ramMb(32),
+ diskMb(64)));
+
+ expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
+
+ control.replay();
+
+ thriftBackfill.backfillTask(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
new file mode 100644
index 0000000..cbad3eb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorderTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionRecorderTest {
+ @Test
+ public void testCoalesce() throws Exception {
+ // No coalescing - different operation types.
+ assertEquals(
+ ImmutableList.of(
+ Op.removeTasks(createRemoveTasks("1", "2")),
+ Op.saveTasks(createSaveTasks("4", "5"))),
+ record(
+ Op.removeTasks(createRemoveTasks("1", "2")),
+ Op.saveTasks(createSaveTasks("4", "5"))));
+
+ assertEquals(
+ ImmutableList.of(Op.removeTasks(createRemoveTasks("1", "2", "3", "4"))),
+ record(
+ Op.removeTasks(createRemoveTasks("1", "2")),
+ Op.removeTasks(createRemoveTasks("3", "4"))));
+
+ assertEquals(
+ ImmutableList.of(Op.saveTasks(createSaveTasks("3", "2", "1"))),
+ record(Op.saveTasks(createSaveTasks("1", "2")), Op.saveTasks(createSaveTasks("1", "3"))));
+
+ assertEquals(
+ ImmutableList.of(Op.removeTasks(createRemoveTasks("3", "4", "5"))),
+ record(
+ Op.removeTasks(createRemoveTasks("3")),
+ Op.removeTasks(createRemoveTasks("4", "5"))));
+ }
+
+ private static List<Op> record(Op... ops) {
+ TransactionRecorder recorder = new TransactionRecorder();
+ Stream.of(ops).forEach(recorder::add);
+ return recorder.getOps();
+ }
+
+ private static SaveTasks createSaveTasks(String... taskIds) {
+ return new SaveTasks().setTasks(
+ Stream.of(taskIds)
+ .map(id -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(id)))
+ .collect(Collectors.toSet())
+ );
+ }
+
+ private RemoveTasks createRemoveTasks(String... taskIds) {
+ return new RemoveTasks(ImmutableSet.copyOf(taskIds));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
new file mode 100644
index 0000000..e8b564b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WriteAheadStorageTest extends EasyMockTest {
+
+ private TransactionManager transactionManager;
+ private TaskStore.Mutable taskStore;
+ private AttributeStore.Mutable attributeStore;
+ private JobUpdateStore.Mutable jobUpdateStore;
+ private EventSink eventSink;
+ private WriteAheadStorage storage;
+
+ @Before
+ public void setUp() {
+ transactionManager = createMock(TransactionManager.class);
+ taskStore = createMock(TaskStore.Mutable.class);
+ attributeStore = createMock(AttributeStore.Mutable.class);
+ jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
+ eventSink = createMock(EventSink.class);
+
+ storage = new WriteAheadStorage(
+ transactionManager,
+ createMock(SchedulerStore.Mutable.class),
+ createMock(CronJobStore.Mutable.class),
+ taskStore,
+ createMock(QuotaStore.Mutable.class),
+ attributeStore,
+ jobUpdateStore,
+ LoggerFactory.getLogger(WriteAheadStorageTest.class),
+ eventSink);
+ }
+
+ private void expectOp(Op op) {
+ expect(transactionManager.hasActiveTransaction()).andReturn(true);
+ transactionManager.log(op);
+ }
+
+ @Test
+ public void testRemoveUpdates() {
+ Set<IJobUpdateKey> removed = ImmutableSet.of(
+ IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
+ IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
+ jobUpdateStore.removeJobUpdates(removed);
+ // No operation is written since this Op is in read-only compatibility mode.
+
+ control.replay();
+
+ storage.removeJobUpdates(removed);
+ }
+
+ @Test
+ public void testMutate() {
+ String taskId = "a";
+ Function<IScheduledTask, IScheduledTask> mutator =
+ createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
+ Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
+
+ expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
+ expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+ control.replay();
+
+ assertEquals(mutated, storage.mutateTask(taskId, mutator));
+ }
+
+ @Test
+ public void testSaveHostAttributes() {
+ IHostAttributes attributes = IHostAttributes.build(
+ new HostAttributes()
+ .setHost("a")
+ .setMode(MaintenanceMode.DRAINING)
+ .setAttributes(ImmutableSet.of(
+ new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+ expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+ expectOp(Op.saveHostAttributes(
+ new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+ eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+ expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+ control.replay();
+
+ assertTrue(storage.saveHostAttributes(attributes));
+
+ assertFalse(storage.saveHostAttributes(attributes));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteAllTasks() {
+ control.replay();
+ storage.deleteAllTasks();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteHostAttributes() {
+ control.replay();
+ storage.deleteHostAttributes();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteJobs() {
+ control.replay();
+ storage.deleteJobs();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteQuotas() {
+ control.replay();
+ storage.deleteQuotas();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteAllUpdatesAndEvents() {
+ control.replay();
+ storage.deleteAllUpdates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 3f44559..cb38f10 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Consumer;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
@@ -37,10 +36,8 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
import org.apache.aurora.gen.storage.Frame;
import org.apache.aurora.gen.storage.FrameChunk;
@@ -48,9 +45,7 @@ import org.apache.aurora.gen.storage.FrameHeader;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveTasks;
import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
@@ -112,11 +107,11 @@ public class LogManagerTest extends EasyMockTest {
public void testStreamManagerReadFromUnknownNone() throws CodingException {
expect(stream.readAll()).andReturn(Collections.emptyIterator());
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
-
control.replay();
- createNoMessagesStreamManager().readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(),
+ ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
}
@Test
@@ -127,12 +122,11 @@ public class LogManagerTest extends EasyMockTest {
expect(entry1.contents()).andReturn(encode(transaction1));
expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1));
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
- reader.accept(transaction1);
-
control.replay();
- createNoMessagesStreamManager().readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(transaction1),
+ ImmutableList.copyOf(createNoMessagesStreamManager().readFromBeginning()));
}
@Test
@@ -214,50 +208,6 @@ public class LogManagerTest extends EasyMockTest {
}
@Test
- public void testCoalesce() throws CodingException {
- SaveTasks saveTasks1 = createSaveTasks("1", "2");
- createSaveTasks("2");
- SaveTasks saveTasks2 = createSaveTasks("1", "3");
- SaveTasks saveTasks3 = createSaveTasks("4", "5");
-
- // saveTasks1 is unrepresented because both of its operations were trumped.
- // saveTasks3 is unrepresented because its operations were deleted.
- SaveTasks coalescedSaves = createSaveTasks("3", "2", "1");
-
- RemoveTasks removeTasks1 = createRemoveTasks("1", "2");
- RemoveTasks removeTasks2 = createRemoveTasks("3");
- RemoveTasks removeTasks3 = createRemoveTasks("4", "5");
-
- RemoveTasks coalescedRemoves =
- new RemoveTasks(ImmutableSet.copyOf(Iterables.concat(removeTasks2.getTaskIds(),
- removeTasks3.getTaskIds())));
-
- expectAppend(position1,
- createLogEntry(
- Op.saveTasks(coalescedSaves),
- Op.removeTasks(removeTasks1),
- Op.saveTasks(saveTasks3),
- Op.removeTasks(coalescedRemoves)));
-
- control.replay();
-
- StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
-
- // The next 2 saves should coalesce
- streamTransaction.add(Op.saveTasks(saveTasks1));
- streamTransaction.add(Op.saveTasks(saveTasks2));
-
- streamTransaction.add(Op.removeTasks(removeTasks1));
- streamTransaction.add(Op.saveTasks(saveTasks3));
-
- // The next 2 removes should coalesce
- streamTransaction.add(Op.removeTasks(removeTasks2));
- streamTransaction.add(Op.removeTasks(removeTasks3));
-
- assertEquals(position1, streamTransaction.commit());
- }
-
- @Test
public void testTransactionSnapshot() throws CodingException {
Snapshot snapshot = createSnapshot();
DeduplicatedSnapshot deduplicated = new SnapshotDeduplicatorImpl().deduplicate(snapshot);
@@ -469,14 +419,12 @@ public class LogManagerTest extends EasyMockTest {
expect(stream.readAll()).andReturn(entries.iterator());
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
- reader.accept(transaction1);
- reader.accept(transaction2);
-
StreamManager streamManager = createStreamManager(message.chunkSize);
control.replay();
- streamManager.readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(transaction1, transaction2),
+ ImmutableList.copyOf(streamManager.readFromBeginning()));
}
@Test
@@ -494,9 +442,6 @@ public class LogManagerTest extends EasyMockTest {
expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator());
- Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
- reader.accept(snapshotLogEntry);
-
control.replay();
HashFunction md5 = Hashing.md5();
@@ -506,7 +451,9 @@ public class LogManagerTest extends EasyMockTest {
md5,
new SnapshotDeduplicatorImpl());
streamManager.snapshot(snapshot);
- streamManager.readFromBeginning(reader);
+ assertEquals(
+ ImmutableList.of(snapshotLogEntry),
+ ImmutableList.copyOf(streamManager.readFromBeginning()));
}
private Snapshot createSnapshot() {
@@ -517,15 +464,6 @@ public class LogManagerTest extends EasyMockTest {
.setTasks(ImmutableSet.of(TaskTestUtil.makeTask("task_id", TaskTestUtil.JOB).newBuilder()));
}
- private SaveTasks createSaveTasks(String... taskIds) {
- return new SaveTasks(ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(taskIds),
- taskId -> new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(taskId)))));
- }
-
- private RemoveTasks createRemoveTasks(String... taskIds) {
- return new RemoveTasks(ImmutableSet.copyOf(taskIds));
- }
-
private void expectFrames(Position position, Message message) throws CodingException {
expect(stream.append(entryEq(message.header))).andReturn(position);
for (LogEntry chunk : message.chunks) {