You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/03/01 00:07:41 UTC
git commit: Add support for slaveHosts set in TaskQuery.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 130bc4873 -> e9c09b121
Add support for slaveHosts set in TaskQuery.
Bugs closed: AURORA-232
Reviewed at https://reviews.apache.org/r/18526/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e9c09b12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e9c09b12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e9c09b12
Branch: refs/heads/master
Commit: e9c09b121a0b623eb2b1d573554758b82bcc147b
Parents: 130bc48
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Feb 28 15:02:06 2014 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Feb 28 15:02:06 2014 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/base/Query.java | 31 ++++++++++---
.../aurora/scheduler/http/Maintenance.java | 4 +-
.../scheduler/storage/mem/MemTaskStore.java | 46 +++++++++++---------
src/main/python/apache/aurora/client/api/sla.py | 3 +-
.../thrift/org/apache/aurora/gen/api.thrift | 2 +-
.../scheduler/storage/mem/MemTaskStoreTest.java | 12 +++++
.../org/apache/aurora/gen/api.thrift.md5 | 2 +-
7 files changed, 67 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index b9f207c..d6f27fd 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -111,8 +111,12 @@ public final class Query {
return unscoped().byId(taskIds);
}
- public static Builder slaveScoped(String slaveHost) {
- return unscoped().bySlave(slaveHost);
+ public static Builder slaveScoped(String slaveHost, String... slaveHosts) {
+ return unscoped().bySlave(slaveHost, slaveHosts);
+ }
+
+ public static Builder slaveScoped(Iterable<String> slaveHosts) {
+ return unscoped().bySlave(slaveHosts);
}
public static Builder statusScoped(ScheduleStatus status, ScheduleStatus... statuses) {
@@ -257,16 +261,31 @@ public final class Query {
}
/**
- * Returns a new builder scoped to the slave uniquely identified by the given slaveHost. A
+ * Returns a new builder scoped to the slaves uniquely identified by the given slaveHosts. A
* builder can only be scoped to slaves once.
*
* @param slaveHost The hostname of the slave to scope the query to.
- * @return A new Builder scoped to the given slave.
+ * @param slaveHosts Additional slave hostnames to scope this query to (they are ORed together).
+ * @return A new Builder scoped to the given slaves.
*/
- public Builder bySlave(String slaveHost) {
+ public Builder bySlave(String slaveHost, String... slaveHosts) {
checkNotNull(slaveHost);
- return new Builder(query.deepCopy().setSlaveHost(slaveHost));
+ return bySlave(ImmutableSet.<String>builder().add(slaveHost).add(slaveHosts).build());
+ }
+
+ /**
+ * Creates a new builder scoped to slaveHosts.
+ *
+ * @see Builder#bySlave(String, String...)
+ *
+ * @param slaveHosts The slaveHost to scope this builder to.
+ * @return A new Builder scoped to the slaveHosts.
+ */
+ public Builder bySlave(Iterable<String> slaveHosts) {
+ checkNotNull(slaveHosts);
+
+ return new Builder(query.deepCopy().setSlaveHosts(ImmutableSet.copyOf(slaveHosts)));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
index 7337044..789eaaf 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -78,9 +78,7 @@ public class Maintenance {
private Multimap<String, String> getTasksByHosts(StoreProvider provider, Iterable<String> hosts) {
ImmutableSet.Builder<IScheduledTask> drainingTasks = ImmutableSet.builder();
- for (String host : hosts) {
- drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(host).active()));
- }
+ drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(hosts).active()));
return Multimaps.transformValues(
Multimaps.index(drainingTasks.build(), Tasks.SCHEDULED_TO_SLAVE_HOST),
Tasks.SCHEDULED_TO_ID);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index d1ab503..421b330 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -72,18 +72,19 @@ class MemTaskStore implements TaskStore.Mutable {
private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
- private static final Function<Query.Builder, Optional<IJobKey>> QUERY_TO_JOB_KEY =
- new Function<Query.Builder, Optional<IJobKey>>() {
+ private static final Function<Query.Builder, Optional<Set<IJobKey>>> QUERY_TO_JOB_KEY =
+ new Function<Query.Builder, Optional<Set<IJobKey>>>() {
@Override
- public Optional<IJobKey> apply(Query.Builder query) {
- return JobKeys.from(query);
+ public Optional<Set<IJobKey>> apply(Query.Builder query) {
+ Optional<IJobKey> jobkey = JobKeys.from(query);
+ return jobkey.isPresent() ? Optional.of(jobkey.asSet()) : Optional.<Set<IJobKey>>absent();
}
};
- private static final Function<Query.Builder, Optional<String>> QUERY_TO_SLAVE_HOST =
- new Function<Query.Builder, Optional<String>>() {
+ private static final Function<Query.Builder, Optional<Set<String>>> QUERY_TO_SLAVE_HOST =
+ new Function<Query.Builder, Optional<Set<String>>>() {
@Override
- public Optional<String> apply(Query.Builder query) {
- return Optional.fromNullable(query.get().getSlaveHost());
+ public Optional<Set<String>> apply(Query.Builder query) {
+ return Optional.fromNullable(query.get().getSlaveHosts());
}
};
@@ -265,8 +266,8 @@ class MemTaskStore implements TaskStore.Mutable {
return false;
}
}
- if (!StringUtils.isEmpty(query.getSlaveHost())) {
- if (!query.getSlaveHost().equals(task.getAssignedTask().getSlaveHost())) {
+ if (query.getSlaveHostsSize() > 0) {
+ if (!query.getSlaveHosts().contains(task.getAssignedTask().getSlaveHost())) {
return false;
}
}
@@ -365,19 +366,19 @@ class MemTaskStore implements TaskStore.Mutable {
private final Multimap<K, String> index =
Multimaps.synchronizedSetMultimap(HashMultimap.<K, String>create());
private final Function<IScheduledTask, K> indexer;
- private final Function<Query.Builder, Optional<K>> queryExtractor;
+ private final Function<Query.Builder, Optional<Set<K>>> queryExtractor;
private final AtomicLong hitCount;
/**
* Creates a secondary index that will extract keys from tasks using the provided indexer.
*
* @param indexer Indexing function.
- * @param queryExtractor Function to extract the key relevant to a query.
- * @param hitCount Counter for number of times the seconary index applies to a query.
+ * @param queryExtractor Function to extract the keys relevant to a query.
+ * @param hitCount Counter for number of times the secondary index applies to a query.
*/
SecondaryIndex(
Function<IScheduledTask, K> indexer,
- Function<Query.Builder, Optional<K>> queryExtractor,
+ Function<Query.Builder, Optional<Set<K>>> queryExtractor,
AtomicLong hitCount) {
this.indexer = indexer;
@@ -416,12 +417,17 @@ class MemTaskStore implements TaskStore.Mutable {
}
}
- private final Function<K, Iterable<String>> lookup = new Function<K, Iterable<String>>() {
- @Override
- public Iterable<String> apply(K key) {
- hitCount.incrementAndGet();
- return index.get(key);
- }
+ private final Function<Set<K>, Iterable<String>> lookup =
+ new Function<Set<K>, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(Set<K> keys) {
+ hitCount.incrementAndGet();
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ for (K key : keys) {
+ builder.addAll(index.get(key));
+ }
+ return builder.build();
+ }
};
Optional<Iterable<String>> getMatches(Query.Builder query) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/python/apache/aurora/client/api/sla.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/sla.py b/src/main/python/apache/aurora/client/api/sla.py
index a7c3158..71013cd 100644
--- a/src/main/python/apache/aurora/client/api/sla.py
+++ b/src/main/python/apache/aurora/client/api/sla.py
@@ -254,10 +254,9 @@ class Sla(object):
check_and_log_response(resp)
return resp.result.scheduleStatusResult.tasks
- def _create_task_query(self, job_key=None, host=None):
+ def _create_task_query(self, job_key=None):
return TaskQuery(
owner=Identity(role=job_key.role) if job_key else None,
environment=job_key.env if job_key else None,
jobName=job_key.name if job_key else None,
- slaveHost=host,
statuses=SLA_LIVE_STATES)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 3ee24c7..f9fc6bc 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -359,8 +359,8 @@ struct TaskQuery {
2: string jobName
4: set<string> taskIds
5: set<ScheduleStatus> statuses
- 6: string slaveHost
7: set<i32> instanceIds
+ 10: set<string> slaveHosts
}
struct HostStatus {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index 884f589..0c1b144 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -107,6 +107,18 @@ public class MemTaskStoreTest {
}
@Test
+ public void testQueryBySlaveHost() {
+ String hostA = "slaveA";
+ String hostB = "slaveB";
+ final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(hostA));
+ final IScheduledTask b = setHost(makeTask("b", "role", "env", "job"), Optional.of(hostB));
+ store.saveTasks(ImmutableSet.of(a, b));
+
+ assertQueryResults(Query.slaveScoped(hostA), a);
+ assertQueryResults(Query.slaveScoped(hostA, hostB), a, b);
+ }
+
+ @Test
public void testMutate() {
store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D));
assertQueryResults(Query.statusScoped(RUNNING));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/test/resources/org/apache/aurora/gen/api.thrift.md5
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/gen/api.thrift.md5 b/src/test/resources/org/apache/aurora/gen/api.thrift.md5
index 4e6c51d..08c4c2a 100644
--- a/src/test/resources/org/apache/aurora/gen/api.thrift.md5
+++ b/src/test/resources/org/apache/aurora/gen/api.thrift.md5
@@ -1 +1 @@
-7645e899bf5b7f8ad2dcdbecae7f91e1
+7bf1d42fb84abddbfd8f580dd7dfdb03