You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/14 23:44:36 UTC
aurora git commit: Return Iterable from TaskStore.fetchTasks to allow
for streaming.
Repository: aurora
Updated Branches:
refs/heads/master 1dc11fb1a -> ef0975655
Return Iterable from TaskStore.fetchTasks to allow for streaming.
Reviewed at https://reviews.apache.org/r/33105/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ef097565
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ef097565
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ef097565
Branch: refs/heads/master
Commit: ef0975655c04f0c2f3ecb6599d4e4beb9547f091
Parents: 1dc11fb
Author: Bill Farner <wf...@apache.org>
Authored: Tue Apr 14 14:44:18 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Apr 14 14:44:18 2015 -0700
----------------------------------------------------------------------
.../aurora/scheduler/async/GcExecutorLauncher.java | 4 ++--
.../org/apache/aurora/scheduler/async/KillRetry.java | 3 ++-
.../aurora/scheduler/async/RescheduleCalculator.java | 2 +-
.../apache/aurora/scheduler/async/TaskHistoryPruner.java | 7 ++++---
.../aurora/scheduler/cron/quartz/AuroraCronJob.java | 3 ++-
.../aurora/scheduler/state/MaintenanceController.java | 5 +++--
.../apache/aurora/scheduler/state/StateManagerImpl.java | 2 +-
.../apache/aurora/scheduler/storage/ForwardingStore.java | 3 +--
.../org/apache/aurora/scheduler/storage/Storage.java | 8 +++-----
.../org/apache/aurora/scheduler/storage/TaskStore.java | 2 +-
.../apache/aurora/scheduler/storage/backup/Recovery.java | 6 +++---
.../scheduler/storage/backup/TemporaryStorage.java | 8 ++++----
.../aurora/scheduler/thrift/ReadOnlySchedulerImpl.java | 8 +++-----
.../scheduler/thrift/SchedulerThriftInterface.java | 11 ++++++-----
.../apache/aurora/scheduler/async/TaskSchedulerTest.java | 3 ++-
15 files changed, 38 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index 1da35c0..4d589a3 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
import java.util.Collections;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
@@ -180,7 +179,8 @@ public class GcExecutorLauncher implements TaskLauncher {
}
private TaskInfo makeGcTask(String hostName, SlaveID slaveId) {
- Set<IScheduledTask> tasksOnHost = Storage.Util.fetchTasks(storage, Query.slaveScoped(hostName));
+ Iterable<IScheduledTask> tasksOnHost =
+ Storage.Util.fetchTasks(storage, Query.slaveScoped(hostName));
tasksCreated.incrementAndGet();
return makeGcTask(
hostName,
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
index 3bb80ec..b125c1c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
@@ -21,6 +21,7 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;
import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.BackoffStrategy;
@@ -88,7 +89,7 @@ public class KillRetry implements EventSubscriber {
@Override
public void run() {
Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING);
- if (!Storage.Util.fetchTasks(storage, query).isEmpty()) {
+ if (!Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
LOG.info("Task " + taskId + " not yet killed, retrying.");
// Kill did not yet take effect, try again.
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index 0cf7fb4..6a0c0a9 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -146,7 +146,7 @@ public interface RescheduleCalculator {
return Optional.absent();
}
- Set<IScheduledTask> res =
+ Iterable<IScheduledTask> res =
Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
return Optional.fromNullable(Iterables.getOnlyElement(res, null));
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
index 985a319..7b6c063 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -155,10 +155,11 @@ public class TaskHistoryPruner implements EventSubscriber {
executor.submit(new Runnable() {
@Override
public void run() {
- Set<IScheduledTask> inactiveTasks =
+ Iterable<IScheduledTask> inactiveTasks =
Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
- int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
- if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
+ int numInactiveTasks = Iterables.size(inactiveTasks);
+ int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
+ if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
Set<String> toPrune = FluentIterable
.from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
.filter(safeToDelete)
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index 3b5dcf8..df180a4 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -23,6 +23,7 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
import com.twitter.common.base.Supplier;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.BackoffHelper;
@@ -200,7 +201,7 @@ class AuroraCronJob implements Job {
delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
@Override
public Boolean get() {
- if (Storage.Util.fetchTasks(storage, query).isEmpty()) {
+ if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
LOG.info("Initiating delayed launch of cron " + path);
storage.write(new Storage.MutateWork.NoResult.Quiet() {
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 09be4dc..a6d7ab7 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
@@ -161,8 +162,8 @@ public interface MaintenanceController {
store.getAttributeStore().getHostAttributes(host);
if (attributes.isPresent() && attributes.get().getMode() == DRAINING) {
Query.Builder builder = Query.slaveScoped(host).active();
- Set<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder);
- if (activeTasks.isEmpty()) {
+ Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder);
+ if (Iterables.isEmpty(activeTasks)) {
LOG.info(String.format("Moving host %s into DRAINED", host));
setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
} else {
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index b6a7b4a..2a943cf 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -126,7 +126,7 @@ public class StateManagerImpl implements StateManager {
}
}).toSet();
- ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
+ Iterable<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
Query.jobScoped(task.getJob()).active());
Set<Integer> existingInstanceIds =
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index a8e3b14..1a63169 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -18,7 +18,6 @@ import java.util.Map;
import java.util.Set;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
@@ -105,7 +104,7 @@ public class ForwardingStore implements
}
@Override
- public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder querySupplier) {
+ public Iterable<IScheduledTask> fetchTasks(Query.Builder querySupplier) {
return taskStore.fetchTasks(querySupplier);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 6180a36..972a3c1 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -20,8 +20,6 @@ import java.lang.annotation.Target;
import javax.inject.Qualifier;
-import com.google.common.collect.ImmutableSet;
-
import org.apache.aurora.scheduler.base.Query.Builder;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -275,10 +273,10 @@ public interface Storage {
* @param query Builder of the query to perform.
* @return Tasks returned from the query.
*/
- public static ImmutableSet<IScheduledTask> fetchTasks(Storage storage, final Builder query) {
- return storage.read(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+ public static Iterable<IScheduledTask> fetchTasks(Storage storage, final Builder query) {
+ return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() {
@Override
- public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
+ public Iterable<IScheduledTask> apply(StoreProvider storeProvider) {
return storeProvider.getTaskStore().fetchTasks(query);
}
});
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index b76c937..2768e6e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -34,7 +34,7 @@ public interface TaskStore {
* @param query Builder of the query to identify tasks with.
* @return A read-only view of matching tasks.
*/
- ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query);
+ Iterable<IScheduledTask> fetchTasks(Query.Builder query);
interface Mutable extends TaskStore {
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 38764e5..fb0dbae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -67,7 +67,7 @@ public interface Recovery {
* @return Tasks matching the query.
* @throws RecoveryException If a backup is not staged, or could not be queried.
*/
- Set<IScheduledTask> query(Query.Builder query) throws RecoveryException;
+ Iterable<IScheduledTask> query(Query.Builder query) throws RecoveryException;
/**
* Deletes tasks from a staged backup.
@@ -163,7 +163,7 @@ public interface Recovery {
}
@Override
- public Set<IScheduledTask> query(Query.Builder query) throws RecoveryException {
+ public Iterable<IScheduledTask> query(Query.Builder query) throws RecoveryException {
return getLoadedRecovery().query(query);
}
@@ -203,7 +203,7 @@ public interface Recovery {
});
}
- Set<IScheduledTask> query(final Query.Builder query) {
+ Iterable<IScheduledTask> query(final Query.Builder query) {
return tempStorage.fetchTasks(query);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 2102adb..586b53b 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -51,7 +51,7 @@ interface TemporaryStorage {
* @param query Query builder for tasks to fetch.
* @return Matching tasks.
*/
- Set<IScheduledTask> fetchTasks(Query.Builder query);
+ Iterable<IScheduledTask> fetchTasks(Query.Builder query);
/**
* Creates a snapshot of the contents of the temporary storage.
@@ -87,10 +87,10 @@ interface TemporaryStorage {
}
@Override
- public Set<IScheduledTask> fetchTasks(final Query.Builder query) {
- return storage.read(new Work.Quiet<Set<IScheduledTask>>() {
+ public Iterable<IScheduledTask> fetchTasks(final Query.Builder query) {
+ return storage.read(new Work.Quiet<Iterable<IScheduledTask>>() {
@Override
- public Set<IScheduledTask> apply(StoreProvider storeProvider) {
+ public Iterable<IScheduledTask> apply(StoreProvider storeProvider) {
return storeProvider.getTaskStore().fetchTasks(query);
}
});
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index 7aef1ca..30e579c 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -209,11 +209,9 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
public Response getConfigSummary(JobKey job) throws TException {
IJobKey jobKey = JobKeys.assertValid(IJobKey.build(job));
- Set<IScheduledTask> activeTasks =
- Storage.Util.fetchTasks(storage, Query.jobScoped(jobKey).active());
-
- Iterable<IAssignedTask> assignedTasks =
- Iterables.transform(activeTasks, Tasks.SCHEDULED_TO_ASSIGNED);
+ Iterable<IAssignedTask> assignedTasks = Iterables.transform(
+ Storage.Util.fetchTasks(storage, Query.jobScoped(jobKey).active()),
+ Tasks.SCHEDULED_TO_ASSIGNED);
Map<Integer, ITaskConfig> tasksByInstance = Maps.transformValues(
Maps.uniqueIndex(assignedTasks, Tasks.ASSIGNED_TO_INSTANCE_ID),
Tasks.ASSIGNED_TO_INFO);
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index b7d3874..160db12 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -293,7 +293,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
}
private void checkJobExists(StoreProvider store, IJobKey jobKey) throws JobExistsException {
- if (!store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()).isEmpty()
+ if (!Iterables.isEmpty(store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()))
|| getCronJob(store, jobKey).isPresent()) {
throw new JobExistsException(jobAlreadyExistsMessage(jobKey));
@@ -615,8 +615,9 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
}
Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
- final Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
- if (matchingTasks.size() != shardIds.size()) {
+ final Iterable<IScheduledTask> matchingTasks =
+ storeProvider.getTaskStore().fetchTasks(query);
+ if (Iterables.size(matchingTasks) != shardIds.size()) {
return invalidRequest("Not all requested shards are active.");
}
@@ -935,12 +936,12 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
ILockKey.build(LockKey.job(jobKey.newBuilder())),
Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
- ImmutableSet<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks(
+ Iterable<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks(
Query.jobScoped(task.getJob()).active());
validateTaskLimits(
task,
- currentTasks.size() + config.getInstanceIdsSize(),
+ Iterables.size(currentTasks) + config.getInstanceIdsSize(),
quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize(), storeProvider));
storage.write(new NoResult.Quiet() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/ef097565/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 34cbd19..858069e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
@@ -181,7 +182,7 @@ public class TaskSchedulerTest extends EasyMockTest {
@Override
protected void execute(MutableStoreProvider storeProvider) {
TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))).isEmpty()) {
+ if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) {
taskStore.saveTasks(ImmutableSet.of(copy));
}
}