You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/15 20:51:10 UTC
[1/2] aurora git commit: Add a storage recovery tool
Repository: aurora
Updated Branches:
refs/heads/master 6fd765bcf -> 2e1ca4288
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
deleted file mode 100644
index 2ad4e84..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Map;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.util.testing.FakeBuildInfo;
-import org.apache.aurora.common.util.testing.FakeClock;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateAction;
-import org.apache.aurora.gen.JobUpdateDetails;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateSettings;
-import org.apache.aurora.gen.JobUpdateState;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.Range;
-import org.apache.aurora.gen.storage.QuotaConfiguration;
-import org.apache.aurora.gen.storage.SchedulerMetadata;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.durability.Loader;
-import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
-import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
-import org.junit.Test;
-
-import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL;
-import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
-import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_RESTORE;
-import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_SAVE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class SnapshotStoreImplIT {
-
- private static final long NOW = 10335463456L;
- private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
-
- private Storage storage;
- private SnapshotStoreImpl snapshotter;
-
- private void setUpStore() {
- storage = MemStorageModule.newEmptyStorage();
- FakeClock clock = new FakeClock();
- clock.setNowMillis(NOW);
- snapshotter = new SnapshotStoreImpl(generateBuildInfo(), clock);
- Stats.flush();
- }
-
- @Test
- public void testBackfill() {
- setUpStore();
- storage.write((NoResult.Quiet) stores ->
- Loader.load(
- stores,
- THRIFT_BACKFILL,
- snapshotter.asStream(makeNonBackfilled()).map(Edit::op)));
-
- assertEquals(expected(), storage.write(snapshotter::from));
- assertSnapshotRestoreStats(1L);
- assertSnapshotSaveStats(1L);
- }
-
- private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY);
- private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY);
- private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration()
- .setKey(new JobKey("owner", "env", "name"))
- .setOwner(new Identity("user"))
- .setCronSchedule("* * * * *")
- .setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING)
- .setInstanceCount(1)
- .setTaskConfig(TASK_CONFIG.newBuilder()));
- private static final String ROLE = "role";
- private static final IResourceAggregate QUOTA =
- ThriftBackfill.backfillResourceAggregate(aggregateFromBag(ResourceBag.LARGE).newBuilder());
- private static final IHostAttributes ATTRIBUTES = IHostAttributes.build(
- new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
- .setMode(MaintenanceMode.NONE)
- .setSlaveId("slave id"));
- private static final String FRAMEWORK_ID = "framework_id";
- private static final Map<String, String> METADATA = ImmutableMap.of(
- FakeBuildInfo.DATE, FakeBuildInfo.DATE,
- FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION,
- FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
- private static final IJobUpdateKey UPDATE_ID =
- IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1"));
- private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails()
- .setUpdate(new JobUpdate()
- .setInstructions(new JobUpdateInstructions()
- .setDesiredState(new InstanceTaskConfig()
- .setTask(TASK_CONFIG.newBuilder())
- .setInstances(ImmutableSet.of(new Range(0, 7))))
- .setInitialState(ImmutableSet.of(
- new InstanceTaskConfig()
- .setInstances(ImmutableSet.of(new Range(0, 1)))
- .setTask(TASK_CONFIG.newBuilder())))
- .setSettings(new JobUpdateSettings()
- .setBlockIfNoPulsesAfterMs(500)
- .setUpdateGroupSize(1)
- .setMaxPerInstanceFailures(1)
- .setMaxFailedInstances(1)
- .setMinWaitInInstanceRunningMs(200)
- .setRollbackOnFailure(true)
- .setWaitForBatchCompletion(true)
- .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0)))))
- .setSummary(new JobUpdateSummary()
- .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR))
- .setUser("user")
- .setKey(UPDATE_ID.newBuilder())))
- .setUpdateEvents(ImmutableList.of(new JobUpdateEvent()
- .setUser("user")
- .setMessage("message")
- .setStatus(JobUpdateStatus.ERROR)))
- .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent()
- .setAction(JobUpdateAction.INSTANCE_UPDATED))));
-
- private Snapshot expected() {
- return new Snapshot()
- .setTimestamp(NOW)
- .setTasks(ImmutableSet.of(TASK.newBuilder()))
- .setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder())))
- .setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder()))
- .setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder())))
- .setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA))
- .setJobUpdateDetails(ImmutableSet.of(
- new StoredJobUpdateDetails().setDetails(UPDATE.newBuilder())));
- }
-
- private Snapshot makeNonBackfilled() {
- return expected();
- }
-
- private void assertSnapshotSaveStats(long count) {
- for (String stat : snapshotter.snapshotFieldNames()) {
- assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read());
- assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total"));
- }
- }
-
- private void assertSnapshotRestoreStats(long count) {
- for (String stat : snapshotter.snapshotFieldNames()) {
- assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read());
- assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java
new file mode 100644
index 0000000..be07361
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.util.testing.FakeBuildInfo;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SchedulerMetadata;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.StoredCronJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.Loader;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.junit.Test;
+
+import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL;
+import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
+import static org.apache.aurora.scheduler.storage.log.SnapshotterImpl.SNAPSHOT_RESTORE;
+import static org.apache.aurora.scheduler.storage.log.SnapshotterImpl.SNAPSHOT_SAVE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SnapshotterImplIT {
+
+ private static final long NOW = 10335463456L;
+ private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
+
+ private Storage storage;
+ private SnapshotterImpl snapshotter;
+
+ private void setUpStore() {
+ storage = MemStorageModule.newEmptyStorage();
+ FakeClock clock = new FakeClock();
+ clock.setNowMillis(NOW);
+ snapshotter = new SnapshotterImpl(generateBuildInfo(), clock);
+ Stats.flush();
+ }
+
+ @Test
+ public void testBackfill() {
+ setUpStore();
+ storage.write((NoResult.Quiet) stores ->
+ Loader.load(
+ stores,
+ THRIFT_BACKFILL,
+ snapshotter.asStream(makeNonBackfilled()).map(Edit::op)));
+
+ assertEquals(expected(), storage.write(snapshotter::from));
+ assertSnapshotRestoreStats(1L);
+ assertSnapshotSaveStats(1L);
+ }
+
+ private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY);
+ private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY);
+ private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration()
+ .setKey(new JobKey("owner", "env", "name"))
+ .setOwner(new Identity("user"))
+ .setCronSchedule("* * * * *")
+ .setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING)
+ .setInstanceCount(1)
+ .setTaskConfig(TASK_CONFIG.newBuilder()));
+ private static final String ROLE = "role";
+ private static final IResourceAggregate QUOTA =
+ ThriftBackfill.backfillResourceAggregate(aggregateFromBag(ResourceBag.LARGE).newBuilder());
+ private static final IHostAttributes ATTRIBUTES = IHostAttributes.build(
+ new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
+ .setMode(MaintenanceMode.NONE)
+ .setSlaveId("slave id"));
+ private static final String FRAMEWORK_ID = "framework_id";
+ private static final Map<String, String> METADATA = ImmutableMap.of(
+ FakeBuildInfo.DATE, FakeBuildInfo.DATE,
+ FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION,
+ FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
+ private static final IJobUpdateKey UPDATE_ID =
+ IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1"));
+ private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails()
+ .setUpdate(new JobUpdate()
+ .setInstructions(new JobUpdateInstructions()
+ .setDesiredState(new InstanceTaskConfig()
+ .setTask(TASK_CONFIG.newBuilder())
+ .setInstances(ImmutableSet.of(new Range(0, 7))))
+ .setInitialState(ImmutableSet.of(
+ new InstanceTaskConfig()
+ .setInstances(ImmutableSet.of(new Range(0, 1)))
+ .setTask(TASK_CONFIG.newBuilder())))
+ .setSettings(new JobUpdateSettings()
+ .setBlockIfNoPulsesAfterMs(500)
+ .setUpdateGroupSize(1)
+ .setMaxPerInstanceFailures(1)
+ .setMaxFailedInstances(1)
+ .setMinWaitInInstanceRunningMs(200)
+ .setRollbackOnFailure(true)
+ .setWaitForBatchCompletion(true)
+ .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0)))))
+ .setSummary(new JobUpdateSummary()
+ .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR))
+ .setUser("user")
+ .setKey(UPDATE_ID.newBuilder())))
+ .setUpdateEvents(ImmutableList.of(new JobUpdateEvent()
+ .setUser("user")
+ .setMessage("message")
+ .setStatus(JobUpdateStatus.ERROR)))
+ .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent()
+ .setAction(JobUpdateAction.INSTANCE_UPDATED))));
+
+ private Snapshot expected() {
+ return new Snapshot()
+ .setTimestamp(NOW)
+ .setTasks(ImmutableSet.of(TASK.newBuilder()))
+ .setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder())))
+ .setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder()))
+ .setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder())))
+ .setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA))
+ .setJobUpdateDetails(ImmutableSet.of(
+ new StoredJobUpdateDetails().setDetails(UPDATE.newBuilder())));
+ }
+
+ private Snapshot makeNonBackfilled() {
+ return expected();
+ }
+
+ private void assertSnapshotSaveStats(long count) {
+ for (String stat : snapshotter.snapshotFieldNames()) {
+ assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read());
+ assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total"));
+ }
+ }
+
+ private void assertSnapshotRestoreStats(long count) {
+ for (String stat : snapshotter.snapshotFieldNames()) {
+ assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read());
+ assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 1500bda..8f9a77f 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -499,6 +499,41 @@ test_thermos_profile() {
[[ "$read_env_output" = "hello" ]]
}
+BACKUPS_DIR='/var/lib/aurora/backups'
+REPLICATED_LOG_DIR='/var/db/aurora'
+
+test_recovery_tool() {
+ local _cluster=$1
+
+ # As a cursory data validation step, fetch an arbitrary job update to ensure it exists after
+ # recovery completes.
+ update=$(aurora update list devcluster --write-json | jq -r '.[0] | .job + " " + .id')
+
+ # Take a backup
+ aurora_admin scheduler_backup_now $_cluster
+ sudo stop aurora-scheduler
+
+ # Reset storage
+ sudo rm -r $REPLICATED_LOG_DIR
+ sudo mesos-log initialize --path=$REPLICATED_LOG_DIR
+
+ # Identify the newest backup file
+ backup=$(basename $(ls -dtr1 $BACKUPS_DIR/* | tail -n1))
+
+ # Recover
+ sudo /home/vagrant/aurora/dist/install/aurora-scheduler/bin/recovery-tool \
+ -from BACKUP \
+ -to LOG \
+ -backup $BACKUPS_DIR/$backup \
+ -native_log_zk_group_path=/aurora/replicated-log \
+ -native_log_file_path=$REPLICATED_LOG_DIR \
+ -zk_endpoints=localhost:2181
+ sudo start aurora-scheduler
+
+ # This command exits non-zero if the update is not found.
+ aurora update info $update
+}
+
test_http_example() {
local _cluster=$1 _role=$2 _env=$3
local _base_config=$4 _updated_config=$5
@@ -793,6 +828,8 @@ test_ephemeral_daemon_with_final "${TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS[@]
test_daemonizing_process "${TEST_DAEMONIZING_PROCESS_ARGS[@]}"
+test_recovery_tool $TEST_CLUSTER
+
/vagrant/src/test/sh/org/apache/aurora/e2e/test_kerberos_end_to_end.sh
/vagrant/src/test/sh/org/apache/aurora/e2e/test_bypass_leader_redirect_end_to_end.sh
RETCODE=0
[2/2] aurora git commit: Add a storage recovery tool
Posted by wf...@apache.org.
Add a storage recovery tool
This tool was originally intended as a migration path between Persistence
backends. As it turns out, the model also works well for recovering from a
backup.
I propose we drop our current recovery mechanism to use this tool. The existing
recovery-via-scheduler-rpc is slightly non-sensical, as it assumes a healthy
scheduler. When an operator decides it is necessary to recover from a backup,
we should assume the scheduler state may be broken. Furthermore, starting an
empty scheduler to bootstrap can have undesirable effects such as advertising
false state to clients and establishing a new empty framework with the master.
Testing Done:
end-to-end tests pass (and exercise recovery tool)
Reviewed at https://reviews.apache.org/r/64625/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2e1ca428
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2e1ca428
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2e1ca428
Branch: refs/heads/master
Commit: 2e1ca42887bc8ea1e8c6cddebe9d1cf29268c714
Parents: 6fd765b
Author: Bill Farner <wf...@apache.org>
Authored: Fri Dec 15 12:07:37 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Dec 15 12:07:37 2017 -0800
----------------------------------------------------------------------
build.gradle | 13 +
config/checkstyle/suppressions.xml | 2 +
.../aurora/benchmark/SnapshotBenchmarks.java | 10 +-
.../aurora/scheduler/app/SchedulerMain.java | 12 +-
.../aurora/scheduler/config/CliOptions.java | 6 +-
.../discovery/ServiceDiscoveryBindings.java | 2 +-
.../scheduler/storage/backup/BackupReader.java | 56 ++++
.../scheduler/storage/backup/Recovery.java | 35 +-
.../storage/backup/TemporaryStorage.java | 4 +-
.../durability/DurableStorageModule.java | 35 ++
.../scheduler/storage/durability/Recovery.java | 119 +++++++
.../storage/durability/RecoveryTool.java | 196 +++++++++++
.../storage/log/LogPersistenceModule.java | 78 +++++
.../scheduler/storage/log/LogStorageModule.java | 110 ------
.../scheduler/storage/log/SnapshotModule.java | 54 +++
.../storage/log/SnapshotStoreImpl.java | 332 -------------------
.../scheduler/storage/log/SnapshotterImpl.java | 332 +++++++++++++++++++
.../aurora/scheduler/app/SchedulerIT.java | 12 +-
.../scheduler/config/CommandLineTest.java | 4 +-
.../storage/durability/RecoveryTest.java | 110 ++++++
.../storage/log/LogPersistenceTest.java | 6 +-
.../storage/log/NonVolatileStorageTest.java | 12 +-
.../storage/log/SnapshotServiceTest.java | 7 +-
.../storage/log/SnapshotStoreImplIT.java | 189 -----------
.../storage/log/SnapshotterImplIT.java | 189 +++++++++++
.../sh/org/apache/aurora/e2e/test_end_to_end.sh | 37 +++
26 files changed, 1284 insertions(+), 678 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4674513..64af7ae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -623,3 +623,16 @@ startScripts {
unixScript.text = unixScript.text.replace('CLASSPATH=', "CLASSPATH=${environmentClasspathPrefix}:")
}
}
+
+// Include a script to run the recovery tool.
+task moreStartScripts(type: CreateStartScripts) {
+ mainClassName = 'org.apache.aurora.scheduler.storage.durability.RecoveryTool'
+ applicationName = 'recovery-tool'
+ outputDir = new File(project.buildDir, 'scripts')
+ classpath = jar.outputs.files + project.configurations.runtime
+}
+
+applicationDistribution.into('bin') {
+ from(moreStartScripts)
+ fileMode = 0755
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/config/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml
index c4081b9..03f57c8 100644
--- a/config/checkstyle/suppressions.xml
+++ b/config/checkstyle/suppressions.xml
@@ -21,6 +21,8 @@ limitations under the License.
<!-- Allow use of System.exit() in main. -->
<suppress files="org/apache/aurora/scheduler/config/CommandLine.java"
checks="RegexpSinglelineJava"/>
+ <suppress files="org/apache/aurora/scheduler/storage/durability/RecoveryTool.java"
+ checks="RegexpSinglelineJava"/>
<suppress files="org/apache/aurora/scheduler/storage/db/migration/.*" checks="TypeName" />
<suppress files="org/apache/aurora/scheduler/storage/db/testmigration/.*" checks="TypeName" />
</suppressions>
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
index 4f99f80..e3ed3f2 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
@@ -27,7 +27,7 @@ import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.thrift.TException;
import org.openjdk.jmh.annotations.Benchmark;
@@ -56,7 +56,7 @@ public class SnapshotBenchmarks {
@Threads(1)
@State(Scope.Thread)
public static class RestoreSnapshotWithUpdatesBenchmark {
- private SnapshotStoreImpl snapshotStore;
+ private SnapshotterImpl snapshotStore;
private Snapshot snapshot;
private Storage storage;
@@ -80,21 +80,21 @@ public class SnapshotBenchmarks {
return System.currentTimeMillis() % 5 == 0;
}
- private SnapshotStoreImpl getSnapshotStore() {
+ private SnapshotterImpl getSnapshotStore() {
Injector injector = Guice.createInjector(
new AbstractModule() {
@Override
protected void configure() {
bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
- bind(SnapshotStoreImpl.class).in(Singleton.class);
+ bind(SnapshotterImpl.class).in(Singleton.class);
}
},
new MemStorageModule());
storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class));
storage.prepare();
- return injector.getInstance(SnapshotStoreImpl.class);
+ return injector.getInstance(SnapshotterImpl.class);
}
private Snapshot createSnapshot(int updates, int events, int instanceEvents) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 2bf7e7b..3ce9bc2 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -59,9 +59,11 @@ import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
import org.apache.aurora.scheduler.stats.StatsModule;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +212,7 @@ public class SchedulerMain {
new ServiceDiscoveryModule(
FlaggedZooKeeperConfig.create(options.zk),
options.main.serversetPath),
- new BackupModule(options.backup, SnapshotStoreImpl.class),
+ new BackupModule(options.backup, SnapshotterImpl.class),
new ExecutorModule(options.executor),
new AbstractModule() {
@Override
@@ -249,8 +251,10 @@ public class SchedulerMain {
.add(
new CommandLineDriverSettingsModule(options.driver, options.main.allowGpuResource),
new LibMesosLoadingModule(options.main.driverImpl),
+ new DurableStorageModule(),
new MesosLogStreamModule(options.mesosLog, FlaggedZooKeeperConfig.create(options.zk)),
- new LogStorageModule(options.logStorage),
+ new LogPersistenceModule(options.logPersistence),
+ new SnapshotModule(options.snapshot),
new TierModule(options.tiers),
new WebhookModule(options.webhook)
)
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index b7f43e0..e4e5358 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -48,7 +48,8 @@ import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.stats.AsyncStatsModule;
import org.apache.aurora.scheduler.stats.StatsModule;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule;
import org.apache.aurora.scheduler.thrift.aop.AopModule;
import org.apache.aurora.scheduler.updater.UpdaterModule;
@@ -64,7 +65,8 @@ public class CliOptions {
public final FlaggedZooKeeperConfig.Options zk = new FlaggedZooKeeperConfig.Options();
public final UpdaterModule.Options updater = new UpdaterModule.Options();
public final StateModule.Options state = new StateModule.Options();
- public final LogStorageModule.Options logStorage = new LogStorageModule.Options();
+ public final LogPersistenceModule.Options logPersistence = new LogPersistenceModule.Options();
+ public final SnapshotModule.Options snapshot = new SnapshotModule.Options();
public final BackupModule.Options backup = new BackupModule.Options();
public final AopModule.Options aop = new AopModule.Options();
public final PruningModule.Options pruning = new PruningModule.Options();
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
index b574c13..a57a77c 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
@@ -44,7 +44,7 @@ public final class ServiceDiscoveryBindings {
/**
* A binding key for the ZooKeeper cluster endpoints.
*/
- static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY =
+ public static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY =
Key.get(new TypeLiteral<Iterable<InetSocketAddress>>() { }, ZooKeeper.class);
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java
new file mode 100644
index 0000000..82d712c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A persistence implementation to be used as a migration source.
+ */
+public class BackupReader implements Persistence {
+
+ private final File backupFile;
+ private final Snapshotter snapshotter;
+
+ public BackupReader(File backupFile, Snapshotter snapshotter) {
+ this.backupFile = requireNonNull(backupFile);
+ this.snapshotter = requireNonNull(snapshotter);
+ }
+
+ @Override
+ public Stream<Edit> recover() throws PersistenceException {
+ if (!backupFile.exists()) {
+ throw new PersistenceException("Backup " + backupFile + " does not exist.");
+ }
+
+ return snapshotter.asStream(Recovery.load(backupFile)).map(Edit::op);
+ }
+
+ @Override
+ public void prepare() {
+ // no-op
+ }
+
+ @Override
+ public void persist(Stream<Op> records) {
+ throw new UnsupportedOperationException("Backups are read-only");
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 79899a0..3c2ea50 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -136,22 +136,7 @@ public interface Recovery {
@Override
public void stage(String backupName) throws RecoveryException {
- File backupFile = new File(backupDir, backupName);
- if (!backupFile.exists()) {
- throw new RecoveryException("Backup " + backupName + " does not exist.");
- }
-
- Snapshot snapshot = new Snapshot();
- try {
- TBinaryProtocol prot = new TBinaryProtocol(
- new TIOStreamTransport(new BufferedInputStream(new FileInputStream(backupFile))));
-
- snapshot.read(prot);
- } catch (TException e) {
- throw new RecoveryException("Failed to decode backup " + e, e);
- } catch (IOException e) {
- throw new RecoveryException("Failed to read backup " + e, e);
- }
+ Snapshot snapshot = load(new File(backupDir, backupName));
boolean applied =
recovery.compareAndSet(null, new PendingRecovery(tempStorageFactory.apply(snapshot)));
if (!applied) {
@@ -214,4 +199,22 @@ public interface Recovery {
}
}
}
+
+ static Snapshot load(File backupFile) throws RecoveryException {
+ if (!backupFile.exists()) {
+ throw new RecoveryException("Backup " + backupFile + " does not exist.");
+ }
+
+ try {
+ Snapshot snapshot = new Snapshot();
+ TBinaryProtocol prot = new TBinaryProtocol(
+ new TIOStreamTransport(new BufferedInputStream(new FileInputStream(backupFile))));
+ snapshot.read(prot);
+ return snapshot;
+ } catch (TException e) {
+ throw new RecoveryException("Failed to decode backup " + e, e);
+ } catch (IOException e) {
+ throw new RecoveryException("Failed to read backup " + e, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 0305d9d..5641738 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -31,7 +31,7 @@ import org.apache.aurora.scheduler.storage.durability.Loader;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import static java.util.Objects.requireNonNull;
@@ -84,7 +84,7 @@ interface TemporaryStorage {
BuildInfo buildInfo = generateBuildInfo();
FakeClock clock = new FakeClock();
clock.setNowMillis(snapshot.getTimestamp());
- Snapshotter snapshotter = new SnapshotStoreImpl(buildInfo, clock);
+ Snapshotter snapshotter = new SnapshotterImpl(buildInfo, clock);
storage.write((NoResult.Quiet) stores -> {
Loader.load(stores, thriftBackfill, snapshotter.asStream(snapshot).map(Edit::op));
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java
new file mode 100644
index 0000000..6bb134a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import javax.inject.Singleton;
+
+import com.google.inject.PrivateModule;
+
+import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+
+/**
+ * Binding module for a durable storage layer.
+ */
+public class DurableStorageModule extends PrivateModule {
+ @Override
+ protected void configure() {
+ install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+ bind(DurableStorage.class).in(Singleton.class);
+ expose(Storage.class);
+ expose(NonVolatileStorage.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java
new file mode 100644
index 0000000..819d70e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to clone a persistence.
+ */
+final class Recovery {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Recovery.class);
+
+ private Recovery() {
+ // utility class.
+ }
+
+ /**
+ * Copies all state from one persistence to another, batching into calls to
+ * {@link Persistence#persist(Stream)}.
+ *
+ * @param from Source.
+ * @param to Destination.
+ * @param batchSize Maximum number of entries to include in any given persist.
+ */
+ static void copy(Persistence from, Persistence to, int batchSize) {
+ requireEmpty(to);
+
+ long start = System.nanoTime();
+ AtomicLong count = new AtomicLong();
+ AtomicInteger batchNumber = new AtomicInteger();
+ List<Op> batch = Lists.newArrayListWithExpectedSize(batchSize);
+ Runnable saveBatch = () -> {
+ LOG.info("Saving batch " + batchNumber.incrementAndGet());
+ try {
+ to.persist(batch.stream());
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ batch.clear();
+ };
+
+ AtomicBoolean dataBegin = new AtomicBoolean(false);
+ try {
+ from.recover()
+ .filter(edit -> {
+ if (edit.isDeleteAll()) {
+ // Suppress any storage reset instructions.
+ // Persistence implementations may recover with these, but do not support persisting
+ // them. As a result, we require that the recovery source produces a reset
+ // instruction at the beginning of the stream, if at all.
+
+ if (dataBegin.get()) {
+ throw new IllegalStateException(
+ "A storage reset instruction arrived after the beginning of data");
+ }
+ return false;
+ } else {
+ dataBegin.set(true);
+ }
+ return true;
+ })
+ .forEach(edit -> {
+ count.incrementAndGet();
+ batch.add(edit.getOp());
+ if (batch.size() == batchSize) {
+ saveBatch.run();
+ LOG.info("Fetching batch");
+ }
+ });
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (!batch.isEmpty()) {
+ saveBatch.run();
+ }
+ long end = System.nanoTime();
+ LOG.info("Recovery finished");
+ LOG.info("Copied " + count.get() + " ops in "
+ + Amount.of(end - start, Time.NANOSECONDS).as(Time.MILLISECONDS) + " ms");
+ }
+
+ private static void requireEmpty(Persistence persistence) {
+ LOG.info("Ensuring recovery destination is empty");
+ try (Stream<Edit> edits = persistence.recover()) {
+ if (edits.findFirst().isPresent()) {
+ throw new IllegalStateException("Refusing to recover into non-empty persistence");
+ }
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java
new file mode 100644
index 0000000..7cb4c52
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.IStringConverterFactory;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.converters.DataAmountConverter;
+import org.apache.aurora.scheduler.config.converters.InetSocketAddressConverter;
+import org.apache.aurora.scheduler.config.converters.TimeAmountConverter;
+import org.apache.aurora.scheduler.config.types.DataAmount;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
+import org.apache.aurora.scheduler.discovery.ServiceDiscoveryBindings;
+import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.backup.BackupReader;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility to recover the contents of one persistence into another.
+ */
+public final class RecoveryTool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecoveryTool.class);
+
+ private RecoveryTool() {
+ // Main-only class.
+ }
+
+ interface RecoveryEndpoint {
+ Iterable<Object> getOptions();
+
+ Persistence create();
+ }
+
+ private static class Log implements RecoveryEndpoint {
+ private final FlaggedZooKeeperConfig.Options zkOptions = new FlaggedZooKeeperConfig.Options();
+ private final MesosLogStreamModule.Options logOptions = new MesosLogStreamModule.Options();
+ private final LogPersistenceModule.Options options = new LogPersistenceModule.Options();
+
+ @Override
+ public Iterable<Object> getOptions() {
+ return ImmutableList.of(logOptions, options, zkOptions);
+ }
+
+ @Override
+ public Persistence create() {
+ Injector injector = Guice.createInjector(
+ new TierModule(TaskTestUtil.TIER_CONFIG),
+ new MesosLogStreamModule(logOptions, FlaggedZooKeeperConfig.create(zkOptions)),
+ new LogPersistenceModule(options),
+ new LifecycleModule(),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+ .toInstance(zkOptions.zkEndpoints);
+ bind(Snapshotter.class).to(SnapshotterImpl.class);
+ bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+ bind(BuildInfo.class).toInstance(new BuildInfo());
+ }
+ });
+ return injector.getInstance(Persistence.class);
+ }
+ }
+
+ private static class Backup implements RecoveryEndpoint {
+ @Parameters(separators = "=")
+ private static class Options {
+ @Parameter(names = "-backup", description = "Backup file to load")
+ File backup;
+ }
+
+ private final Options options = new Options();
+
+ @Override
+ public Iterable<Object> getOptions() {
+ return ImmutableList.of(options);
+ }
+
+ @Override
+ public Persistence create() {
+ return new BackupReader(
+ options.backup,
+ new SnapshotterImpl(new BuildInfo(), Clock.SYSTEM_CLOCK));
+ }
+ }
+
+ enum Endpoint {
+ LOG(new Log()),
+ BACKUP(new Backup());
+
+ private final RecoveryEndpoint impl;
+
+ Endpoint(RecoveryEndpoint impl) {
+ this.impl = impl;
+ }
+ }
+
+ @Parameters(separators = "=")
+ private static class Options {
+ @Parameter(names = "-from",
+ required = true,
+ description = "Persistence to read state from")
+ Endpoint from;
+
+ @Parameter(names = "-to",
+ required = true,
+ description = "Persistence to write recovered state into")
+ Endpoint to;
+
+ @Parameter(names = "-batch-size",
+ description = "Write in batches of this may ops.")
+ int batchSize = 50;
+
+ @Parameter(names = "--help", description = "Print usage", help = true)
+ boolean help;
+ }
+
+ private static JCommander configure(Options options, String... args) {
+ JCommander.Builder builder = JCommander.newBuilder().programName(RecoveryTool.class.getName());
+ builder.addConverterFactory(new IStringConverterFactory() {
+ private Map<Class<?>, Class<? extends IStringConverter<?>>> classConverters =
+ ImmutableMap.<Class<?>, Class<? extends IStringConverter<?>>>builder()
+ .put(DataAmount.class, DataAmountConverter.class)
+ .put(InetSocketAddress.class, InetSocketAddressConverter.class)
+ .put(TimeAmount.class, TimeAmountConverter.class)
+ .build();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> Class<? extends IStringConverter<T>> getConverter(Class<T> forType) {
+ return (Class<IStringConverter<T>>) classConverters.get(forType);
+ }
+ });
+
+ builder.addObject(options);
+ for (Endpoint endpoint : Endpoint.values()) {
+ endpoint.impl.getOptions().forEach(builder::addObject);
+ }
+
+ JCommander parser = builder.build();
+ parser.parse(args);
+ return parser;
+ }
+
+ public static void main(String[] args) {
+ Options options = new Options();
+ JCommander parser = configure(options, args);
+ if (options.help) {
+ parser.usage();
+ System.exit(1);
+ }
+
+ LOG.info("Recovering from " + options.from + " to " + options.to);
+ Persistence from = options.from.impl.create();
+ Persistence to = options.to.impl.create();
+
+ from.prepare();
+ to.prepare();
+
+ Recovery.copy(from, to, options.batchSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java
new file mode 100644
index 0000000..ffe3cbf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.scheduler.config.types.DataAmount;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
+import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
+import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
+import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+
+/**
+ * Bindings for scheduler distributed log based persistence.
+ */
+public class LogPersistenceModule extends PrivateModule {
+
+ @Parameters(separators = "=")
+ public static class Options {
+ @Parameter(names = "-dlog_max_entry_size",
+ description =
+ "Specifies the maximum entry size to append to the log. Larger entries will be "
+ + "split across entry Frames.")
+ public DataAmount maxLogEntrySize = new DataAmount(512, Data.KB);
+ }
+
+ private final Options options;
+
+ public LogPersistenceModule(Options options) {
+ this.options = options;
+ }
+
+ @Override
+ protected void configure() {
+ bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
+ .toInstance(options.maxLogEntrySize);
+ bind(LogManager.class).in(Singleton.class);
+ bind(LogPersistence.class).in(Singleton.class);
+ bind(Persistence.class).to(LogPersistence.class);
+ expose(Persistence.class);
+ expose(LogPersistence.class);
+
+ bind(EntrySerializer.class).to(EntrySerializerImpl.class);
+ // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
+ // versus a faster error-detection checksum like CRC32 for large Snapshots.
+ @SuppressWarnings("deprecation")
+ HashFunction hashFunction = Hashing.md5();
+ bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
+
+ bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
+
+ install(new FactoryModuleBuilder()
+ .implement(StreamManager.class, StreamManagerImpl.class)
+ .build(StreamManagerFactory.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
deleted file mode 100644
index 671593c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.config.types.DataAmount;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.durability.DurableStorage;
-import org.apache.aurora.scheduler.storage.durability.Persistence;
-import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
-import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
-import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
-import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings;
-
-/**
- * Bindings for scheduler distributed log based storage.
- */
-public class LogStorageModule extends AbstractModule {
-
- @Parameters(separators = "=")
- public static class Options {
- @Parameter(names = "-dlog_snapshot_interval",
- description = "Specifies the frequency at which snapshots of local storage are taken and "
- + "written to the log.")
- public TimeAmount snapshotInterval = new TimeAmount(1, Time.HOURS);
-
- @Parameter(names = "-dlog_max_entry_size",
- description =
- "Specifies the maximum entry size to append to the log. Larger entries will be "
- + "split across entry Frames.")
- public DataAmount maxLogEntrySize = new DataAmount(512, Data.KB);
- }
-
- private final Options options;
-
- public LogStorageModule(Options options) {
- this.options = options;
- }
-
- @Override
- protected void configure() {
- install(new PrivateModule() {
- @Override
- protected void configure() {
- bind(Settings.class).toInstance(new Settings(options.snapshotInterval));
-
- bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
- .toInstance(options.maxLogEntrySize);
- bind(LogManager.class).in(Singleton.class);
- bind(DurableStorage.class).in(Singleton.class);
-
- install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
- bind(LogPersistence.class).in(Singleton.class);
- bind(Persistence.class).to(LogPersistence.class);
- bind(SnapshotStore.class).to(SnapshotService.class);
- bind(SnapshotService.class).in(Singleton.class);
- expose(SnapshotService.class);
- expose(Persistence.class);
- expose(Storage.class);
- expose(NonVolatileStorage.class);
- expose(SnapshotStore.class);
-
- bind(EntrySerializer.class).to(EntrySerializerImpl.class);
- // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
- // versus a faster error-detection checksum like CRC32 for large Snapshots.
- @SuppressWarnings("deprecation")
- HashFunction hashFunction = Hashing.md5();
- bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
-
- bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
-
- install(new FactoryModuleBuilder()
- .implement(StreamManager.class, StreamManagerImpl.class)
- .build(StreamManagerFactory.class));
- }
- });
-
- SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java
new file mode 100644
index 0000000..8c0fc12
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.inject.AbstractModule;
+
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings;
+
+/**
+ * Binding for a snapshot store and period snapshotting service.
+ */
+public class SnapshotModule extends AbstractModule {
+
+ @Parameters(separators = "=")
+ public static class Options {
+ @Parameter(names = "-dlog_snapshot_interval",
+ description = "Specifies the frequency at which snapshots of local storage are taken and "
+ + "written to the log.")
+ public TimeAmount snapshotInterval = new TimeAmount(1, Time.HOURS);
+ }
+
+ private final Options options;
+
+ public SnapshotModule(Options options) {
+ this.options = options;
+ }
+
+ @Override
+ protected void configure() {
+ bind(Settings.class).toInstance(new Settings(options.snapshotInterval));
+ bind(SnapshotStore.class).to(SnapshotService.class);
+ bind(SnapshotService.class).in(Singleton.class);
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
deleted file mode 100644
index 50553f8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Streams;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.common.stats.SlidingStats.Timeable;
-import org.apache.aurora.common.util.BuildInfo;
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.QuotaConfiguration;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.SchedulerMetadata;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.Snapshotter;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Snapshot store implementation that delegates to underlying snapshot stores by
- * extracting/applying fields in a snapshot thrift struct.
- */
-public class SnapshotStoreImpl implements Snapshotter {
-
- @VisibleForTesting
- static final String SNAPSHOT_SAVE = "snapshot_save_";
- @VisibleForTesting
- static final String SNAPSHOT_RESTORE = "snapshot_restore_";
-
- private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class);
-
- private static final String HOST_ATTRIBUTES_FIELD = "hosts";
- private static final String QUOTA_FIELD = "quota";
- private static final String TASK_FIELD = "tasks";
- private static final String CRON_FIELD = "crons";
- private static final String JOB_UPDATE_FIELD = "job_updates";
- private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata";
-
- @VisibleForTesting
- Set<String> snapshotFieldNames() {
- return snapshotFields.stream()
- .map(SnapshotField::getName)
- .collect(Collectors.toSet());
- }
-
- private final List<SnapshotField> snapshotFields = ImmutableList.of(
- new SnapshotField() {
- @Override
- String getName() {
- return HOST_ATTRIBUTES_FIELD;
- }
-
- @Override
- void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setHostAttributes(
- IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
- }
-
- @Override
- Stream<Op> doStreamFrom(Snapshot snapshot) {
- if (snapshot.getHostAttributesSize() > 0) {
- return snapshot.getHostAttributes().stream()
- .map(attributes -> Op.saveHostAttributes(
- new SaveHostAttributes().setHostAttributes(attributes)));
- }
- return Stream.empty();
- }
- },
- new SnapshotField() {
- @Override
- String getName() {
- return TASK_FIELD;
- }
-
- @Override
- void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setTasks(
- IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
- }
-
- @Override
- Stream<Op> doStreamFrom(Snapshot snapshot) {
- if (snapshot.getTasksSize() > 0) {
- return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
- }
- return Stream.empty();
- }
- },
- new SnapshotField() {
- @Override
- String getName() {
- return CRON_FIELD;
- }
-
- @Override
- void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
-
- for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
- jobs.add(new StoredCronJob(config.newBuilder()));
- }
- snapshot.setCronJobs(jobs.build());
- }
-
- @Override
- Stream<Op> doStreamFrom(Snapshot snapshot) {
- if (snapshot.getCronJobsSize() > 0) {
- return snapshot.getCronJobs().stream()
- .map(job -> Op.saveCronJob(
- new SaveCronJob().setJobConfig(job.getJobConfiguration())));
- }
- return Stream.empty();
- }
- },
- new SnapshotField() {
- @Override
- String getName() {
- return SCHEDULER_METADATA_FIELD;
- }
-
- @Override
- void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- // SchedulerMetadata is updated outside of the static list of SnapshotFields
- }
-
- @Override
- Stream<Op> doStreamFrom(Snapshot snapshot) {
- if (snapshot.isSetSchedulerMetadata()
- && snapshot.getSchedulerMetadata().isSetFrameworkId()) {
- // No delete necessary here since this is a single value.
-
- return Stream.of(Op.saveFrameworkId(
- new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
- }
- return Stream.empty();
- }
- },
- new SnapshotField() {
- @Override
- String getName() {
- return QUOTA_FIELD;
- }
-
- @Override
- void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
- for (Map.Entry<String, IResourceAggregate> entry
- : store.getQuotaStore().fetchQuotas().entrySet()) {
-
- quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
- }
-
- snapshot.setQuotaConfigurations(quotas.build());
- }
-
- @Override
- Stream<Op> doStreamFrom(Snapshot snapshot) {
- if (snapshot.getQuotaConfigurationsSize() > 0) {
- return snapshot.getQuotaConfigurations().stream()
- .map(quota -> Op.saveQuota(new SaveQuota()
- .setRole(quota.getRole())
- .setQuota(quota.getQuota())));
- }
- return Stream.empty();
- }
- },
- new SnapshotField() {
- @Override
- String getName() {
- return JOB_UPDATE_FIELD;
- }
-
- @Override
- void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
- snapshot.setJobUpdateDetails(
- store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
- .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
- .collect(Collectors.toSet()));
- }
-
- @Override
- Stream<Op> doStreamFrom(Snapshot snapshot) {
- if (snapshot.getJobUpdateDetailsSize() > 0) {
- return snapshot.getJobUpdateDetails().stream()
- .flatMap(details -> {
- Stream<Op> parent = Stream.of(Op.saveJobUpdate(
- new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
- Stream<Op> jobEvents;
- if (details.getDetails().getUpdateEventsSize() > 0) {
- jobEvents = details.getDetails().getUpdateEvents().stream()
- .map(event -> Op.saveJobUpdateEvent(
- new SaveJobUpdateEvent()
- .setKey(details.getDetails().getUpdate().getSummary().getKey())
- .setEvent(event)));
- } else {
- jobEvents = Stream.empty();
- }
-
- Stream<Op> instanceEvents;
- if (details.getDetails().getInstanceEventsSize() > 0) {
- instanceEvents = details.getDetails().getInstanceEvents().stream()
- .map(event -> Op.saveJobInstanceUpdateEvent(
- new SaveJobInstanceUpdateEvent()
- .setKey(details.getDetails().getUpdate().getSummary().getKey())
- .setEvent(event)));
- } else {
- instanceEvents = Stream.empty();
- }
-
- return Streams.concat(parent, jobEvents, instanceEvents);
- });
- }
- return Stream.empty();
- }
- }
- );
-
- private final BuildInfo buildInfo;
- private final Clock clock;
-
- @Inject
- public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock) {
- this.buildInfo = requireNonNull(buildInfo);
- this.clock = requireNonNull(clock);
- }
-
- private Snapshot createSnapshot(StoreProvider storeProvider) {
- Snapshot snapshot = new Snapshot();
-
- // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
- // one of the field closures is mean and tries to apply a timestamp.
- long timestamp = clock.nowMillis();
- for (SnapshotField field : snapshotFields) {
- field.save(storeProvider, snapshot);
- }
-
- SchedulerMetadata metadata = new SchedulerMetadata()
- .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null))
- .setDetails(buildInfo.getProperties());
-
- snapshot.setSchedulerMetadata(metadata);
- snapshot.setTimestamp(timestamp);
- return snapshot;
- }
-
- @Timed("snapshot_create")
- @Override
- public Snapshot from(StoreProvider stores) {
- return createSnapshot(stores);
- }
-
- @Timed("snapshot_apply")
- @Override
- public Stream<Op> asStream(Snapshot snapshot) {
- requireNonNull(snapshot);
-
- LOG.info("Restoring snapshot.");
- return snapshotFields.stream()
- .flatMap(field -> field.streamFrom(snapshot));
- }
-
- abstract class SnapshotField {
-
- abstract String getName();
-
- abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
-
- abstract Stream<Op> doStreamFrom(Snapshot snapshot);
-
- void save(StoreProvider storeProvider, Snapshot snapshot) {
- stats.getUnchecked(SNAPSHOT_SAVE + getName())
- .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
- }
-
- Stream<Op> streamFrom(Snapshot snapshot) {
- return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
- }
- }
-
- private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build(
- new CacheLoader<String, SlidingStats>() {
- @Override
- public SlidingStats load(String name) throws Exception {
- return new SlidingStats(name, "nanos");
- }
- });
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java
new file mode 100644
index 0000000..4b52be0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Streams;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.SlidingStats.Timeable;
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.gen.storage.SchedulerMetadata;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.StoredCronJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Snapshot store implementation that delegates to underlying snapshot stores by
+ * extracting/applying fields in a snapshot thrift struct.
+ */
+public class SnapshotterImpl implements Snapshotter {
+
+ @VisibleForTesting
+ static final String SNAPSHOT_SAVE = "snapshot_save_";
+ @VisibleForTesting
+ static final String SNAPSHOT_RESTORE = "snapshot_restore_";
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotterImpl.class);
+
+ private static final String HOST_ATTRIBUTES_FIELD = "hosts";
+ private static final String QUOTA_FIELD = "quota";
+ private static final String TASK_FIELD = "tasks";
+ private static final String CRON_FIELD = "crons";
+ private static final String JOB_UPDATE_FIELD = "job_updates";
+ private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata";
+
+ @VisibleForTesting
+ Set<String> snapshotFieldNames() {
+ return snapshotFields.stream()
+ .map(SnapshotField::getName)
+ .collect(Collectors.toSet());
+ }
+
+ private final List<SnapshotField> snapshotFields = ImmutableList.of(
+ new SnapshotField() {
+ @Override
+ String getName() {
+ return HOST_ATTRIBUTES_FIELD;
+ }
+
+ @Override
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ snapshot.setHostAttributes(
+ IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
+ }
+
+ @Override
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
+ if (snapshot.getHostAttributesSize() > 0) {
+ return snapshot.getHostAttributes().stream()
+ .map(attributes -> Op.saveHostAttributes(
+ new SaveHostAttributes().setHostAttributes(attributes)));
+ }
+ return Stream.empty();
+ }
+ },
+ new SnapshotField() {
+ @Override
+ String getName() {
+ return TASK_FIELD;
+ }
+
+ @Override
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ snapshot.setTasks(
+ IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
+ }
+
+ @Override
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
+ if (snapshot.getTasksSize() > 0) {
+ return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
+ }
+ return Stream.empty();
+ }
+ },
+ new SnapshotField() {
+ @Override
+ String getName() {
+ return CRON_FIELD;
+ }
+
+ @Override
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
+
+ for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
+ jobs.add(new StoredCronJob(config.newBuilder()));
+ }
+ snapshot.setCronJobs(jobs.build());
+ }
+
+ @Override
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
+ if (snapshot.getCronJobsSize() > 0) {
+ return snapshot.getCronJobs().stream()
+ .map(job -> Op.saveCronJob(
+ new SaveCronJob().setJobConfig(job.getJobConfiguration())));
+ }
+ return Stream.empty();
+ }
+ },
+ new SnapshotField() {
+ @Override
+ String getName() {
+ return SCHEDULER_METADATA_FIELD;
+ }
+
+ @Override
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ // SchedulerMetadata is updated outside of the static list of SnapshotFields
+ }
+
+ @Override
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
+ if (snapshot.isSetSchedulerMetadata()
+ && snapshot.getSchedulerMetadata().isSetFrameworkId()) {
+ // No delete necessary here since this is a single value.
+
+ return Stream.of(Op.saveFrameworkId(
+ new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
+ }
+ return Stream.empty();
+ }
+ },
+ new SnapshotField() {
+ @Override
+ String getName() {
+ return QUOTA_FIELD;
+ }
+
+ @Override
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
+ for (Map.Entry<String, IResourceAggregate> entry
+ : store.getQuotaStore().fetchQuotas().entrySet()) {
+
+ quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
+ }
+
+ snapshot.setQuotaConfigurations(quotas.build());
+ }
+
+ @Override
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
+ if (snapshot.getQuotaConfigurationsSize() > 0) {
+ return snapshot.getQuotaConfigurations().stream()
+ .map(quota -> Op.saveQuota(new SaveQuota()
+ .setRole(quota.getRole())
+ .setQuota(quota.getQuota())));
+ }
+ return Stream.empty();
+ }
+ },
+ new SnapshotField() {
+ @Override
+ String getName() {
+ return JOB_UPDATE_FIELD;
+ }
+
+ @Override
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+ snapshot.setJobUpdateDetails(
+ store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
+ .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
+ if (snapshot.getJobUpdateDetailsSize() > 0) {
+ return snapshot.getJobUpdateDetails().stream()
+ .flatMap(details -> {
+ Stream<Op> parent = Stream.of(Op.saveJobUpdate(
+ new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
+ Stream<Op> jobEvents;
+ if (details.getDetails().getUpdateEventsSize() > 0) {
+ jobEvents = details.getDetails().getUpdateEvents().stream()
+ .map(event -> Op.saveJobUpdateEvent(
+ new SaveJobUpdateEvent()
+ .setKey(details.getDetails().getUpdate().getSummary().getKey())
+ .setEvent(event)));
+ } else {
+ jobEvents = Stream.empty();
+ }
+
+ Stream<Op> instanceEvents;
+ if (details.getDetails().getInstanceEventsSize() > 0) {
+ instanceEvents = details.getDetails().getInstanceEvents().stream()
+ .map(event -> Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent()
+ .setKey(details.getDetails().getUpdate().getSummary().getKey())
+ .setEvent(event)));
+ } else {
+ instanceEvents = Stream.empty();
+ }
+
+ return Streams.concat(parent, jobEvents, instanceEvents);
+ });
+ }
+ return Stream.empty();
+ }
+ }
+ );
+
+ private final BuildInfo buildInfo;
+ private final Clock clock;
+
+ @Inject
+ public SnapshotterImpl(BuildInfo buildInfo, Clock clock) {
+ this.buildInfo = requireNonNull(buildInfo);
+ this.clock = requireNonNull(clock);
+ }
+
+ private Snapshot createSnapshot(StoreProvider storeProvider) {
+ Snapshot snapshot = new Snapshot();
+
+ // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
+ // one of the field closures is mean and tries to apply a timestamp.
+ long timestamp = clock.nowMillis();
+ for (SnapshotField field : snapshotFields) {
+ field.save(storeProvider, snapshot);
+ }
+
+ SchedulerMetadata metadata = new SchedulerMetadata()
+ .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null))
+ .setDetails(buildInfo.getProperties());
+
+ snapshot.setSchedulerMetadata(metadata);
+ snapshot.setTimestamp(timestamp);
+ return snapshot;
+ }
+
+ @Timed("snapshot_create")
+ @Override
+ public Snapshot from(StoreProvider stores) {
+ return createSnapshot(stores);
+ }
+
+ @Timed("snapshot_apply")
+ @Override
+ public Stream<Op> asStream(Snapshot snapshot) {
+ requireNonNull(snapshot);
+
+ LOG.info("Restoring snapshot.");
+ return snapshotFields.stream()
+ .flatMap(field -> field.streamFrom(snapshot));
+ }
+
+ abstract class SnapshotField {
+
+ abstract String getName();
+
+ abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
+
+ abstract Stream<Op> doStreamFrom(Snapshot snapshot);
+
+ void save(StoreProvider storeProvider, Snapshot snapshot) {
+ stats.getUnchecked(SNAPSHOT_SAVE + getName())
+ .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
+ }
+
+ Stream<Op> streamFrom(Snapshot snapshot) {
+ return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
+ }
+ }
+
+ private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, SlidingStats>() {
+ @Override
+ public SlidingStats load(String name) throws Exception {
+ return new SlidingStats(name, "nanos");
+ }
+ });
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 77fa904..63c338e 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -76,12 +76,14 @@ import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.log.EntrySerializer;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
import org.apache.mesos.Protos;
@@ -198,7 +200,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
BackupModule.Options backupOptions = new BackupModule.Options();
backupOptions.backupDir = backupDir;
- install(new BackupModule(backupOptions, SnapshotStoreImpl.class));
+ install(new BackupModule(backupOptions, SnapshotterImpl.class));
bind(IServerInfo.class).toInstance(
IServerInfo.build(
@@ -217,7 +219,9 @@ public class SchedulerIT extends BaseZooKeeperTest {
ImmutableList.<Module>builder()
.add(SchedulerMain.getUniversalModule(new CliOptions()))
.add(new TierModule(TaskTestUtil.TIER_CONFIG))
- .add(new LogStorageModule(new LogStorageModule.Options()))
+ .add(new DurableStorageModule())
+ .add(new LogPersistenceModule(new LogPersistenceModule.Options()))
+ .add(new SnapshotModule(new SnapshotModule.Options()))
.add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH))
.add(testModule)
.build()
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 53a2315..f685d2e 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -171,8 +171,8 @@ public class CommandLineTest {
expected.updater.enableAffinity = true;
expected.updater.affinityExpiration = TEST_TIME;
expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
- expected.logStorage.snapshotInterval = TEST_TIME;
- expected.logStorage.maxLogEntrySize = TEST_DATA;
+ expected.snapshot.snapshotInterval = TEST_TIME;
+ expected.logPersistence.maxLogEntrySize = TEST_DATA;
expected.backup.backupInterval = TEST_TIME;
expected.backup.maxSavedBackups = 42;
expected.backup.backupDir = new File("testing");
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java
new file mode 100644
index 0000000..4cbd0cf
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class RecoveryTest {
+
+ @Test
+ public void testRecover() {
+ ListPersistence from = new ListPersistence(
+ Edit.op(Op.saveQuota(new SaveQuota())),
+ Edit.op(Op.saveTasks(new SaveTasks())));
+ ListPersistence to = new ListPersistence();
+
+ Recovery.copy(from, to, 100);
+
+ assertEquals(from.edits, to.edits);
+ }
+
+ @Test
+ public void testRecoverWithDeleteAll() {
+ ListPersistence from = new ListPersistence(
+ Edit.deleteAll(),
+ Edit.op(Op.saveQuota(new SaveQuota())),
+ Edit.op(Op.saveTasks(new SaveTasks())));
+ ListPersistence to = new ListPersistence();
+
+ Recovery.copy(from, to, 100);
+
+ assertEquals(from.edits.subList(1, from.edits.size()), to.edits);
+ }
+
+ @Test
+ public void testRequiresEmptyTarget() {
+ ListPersistence from = new ListPersistence();
+ ListPersistence to = new ListPersistence(Edit.op(Op.saveQuota(new SaveQuota())));
+
+ try {
+ Recovery.copy(from, to, 100);
+ fail();
+ } catch (IllegalStateException e) {
+ // expected.
+ }
+ }
+
+ @Test
+ public void testDeleteAllAfterFirstPosition() {
+ ListPersistence from = new ListPersistence(
+ Edit.op(Op.saveQuota(new SaveQuota())),
+ Edit.deleteAll(),
+ Edit.op(Op.saveTasks(new SaveTasks())));
+ ListPersistence to = new ListPersistence();
+
+ try {
+ Recovery.copy(from, to, 100);
+ fail();
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ private static class ListPersistence implements Persistence {
+
+ private final List<Edit> edits;
+
+ ListPersistence(Edit... edits) {
+ this.edits = Lists.newArrayList(edits);
+ }
+
+ @Override
+ public void prepare() {
+ // no-op
+ }
+
+ @Override
+ public Stream<Edit> recover() {
+ return edits.stream();
+ }
+
+ @Override
+ public void persist(Stream<Op> records) throws PersistenceException {
+ edits.addAll(records.map(Edit::op).collect(Collectors.toList()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
index 3d6d555..a84e408 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
@@ -45,7 +45,7 @@ import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
import org.apache.aurora.scheduler.storage.durability.Persistence;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
@@ -67,7 +67,7 @@ public class LogPersistenceTest extends EasyMockTest {
mockStream = createMock(Stream.class);
Injector injector = Guice.createInjector(
- new LogStorageModule(new Options()),
+ new LogPersistenceModule(new Options()),
new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
new TierModule(TaskTestUtil.TIER_CONFIG),
new AbstractModule() {
@@ -77,7 +77,7 @@ public class LogPersistenceTest extends EasyMockTest {
bind(EventSink.class).toInstance(e -> { });
bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo());
bind(Clock.class).toInstance(new FakeClock());
- bind(Snapshotter.class).to(SnapshotStoreImpl.class);
+ bind(Snapshotter.class).to(SnapshotterImpl.class);
bind(Log.class).toInstance(mockLog);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index ffd4167..3bd8bf6 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -27,7 +27,6 @@ import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImp
import org.apache.aurora.common.collections.Pair;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.TearDownTestCase;
import org.apache.aurora.common.util.BuildInfo;
@@ -36,7 +35,6 @@ import org.apache.aurora.common.util.testing.FakeBuildInfo;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.config.types.DataAmount;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
@@ -46,8 +44,9 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
@@ -74,12 +73,13 @@ public class NonVolatileStorageTest extends TearDownTestCase {
Options options = new Options();
options.maxLogEntrySize = new DataAmount(1, Data.GB);
- options.snapshotInterval = new TimeAmount(1, Time.DAYS);
ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
Injector injector = Guice.createInjector(
new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
- new LogStorageModule(options),
+ new DurableStorageModule(),
+ new LogPersistenceModule(options),
+ new SnapshotModule(new SnapshotModule.Options()),
new TierModule(new TierModule.Options()),
new AbstractModule() {
@Override
@@ -90,7 +90,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(Log.class).toInstance(log);
- bind(Snapshotter.class).to(SnapshotStoreImpl.class);
+ bind(Snapshotter.class).to(SnapshotterImpl.class);
}
}
);
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
index 270453d..e37d566 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
@@ -48,7 +48,8 @@ import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule.Options;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.IAnswer;
@@ -84,7 +85,9 @@ public class SnapshotServiceTest extends EasyMockTest {
Injector injector = Guice.createInjector(
new SchedulerServicesModule(),
- new LogStorageModule(options),
+ new LogPersistenceModule(new LogPersistenceModule.Options()),
+ new SnapshotModule(options),
+ new DurableStorageModule(),
new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
new TierModule(TaskTestUtil.TIER_CONFIG),
new AbstractModule() {