You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/03/01 00:07:41 UTC

git commit: Add support for slaveHosts set in TaskQuery.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 130bc4873 -> e9c09b121


Add support for slaveHosts set in TaskQuery.

Bugs closed: AURORA-232

Reviewed at https://reviews.apache.org/r/18526/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e9c09b12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e9c09b12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e9c09b12

Branch: refs/heads/master
Commit: e9c09b121a0b623eb2b1d573554758b82bcc147b
Parents: 130bc48
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Feb 28 15:02:06 2014 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Feb 28 15:02:06 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/base/Query.java | 31 ++++++++++---
 .../aurora/scheduler/http/Maintenance.java      |  4 +-
 .../scheduler/storage/mem/MemTaskStore.java     | 46 +++++++++++---------
 src/main/python/apache/aurora/client/api/sla.py |  3 +-
 .../thrift/org/apache/aurora/gen/api.thrift     |  2 +-
 .../scheduler/storage/mem/MemTaskStoreTest.java | 12 +++++
 .../org/apache/aurora/gen/api.thrift.md5        |  2 +-
 7 files changed, 67 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index b9f207c..d6f27fd 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -111,8 +111,12 @@ public final class Query {
     return unscoped().byId(taskIds);
   }
 
-  public static Builder slaveScoped(String slaveHost) {
-    return unscoped().bySlave(slaveHost);
+  public static Builder slaveScoped(String slaveHost, String... slaveHosts) {
+    return unscoped().bySlave(slaveHost, slaveHosts);
+  }
+
+  public static Builder slaveScoped(Iterable<String> slaveHosts) {
+    return unscoped().bySlave(slaveHosts);
   }
 
   public static Builder statusScoped(ScheduleStatus status, ScheduleStatus... statuses) {
@@ -257,16 +261,31 @@ public final class Query {
     }
 
     /**
-     * Returns a new builder scoped to the slave uniquely identified by the given slaveHost. A
+     * Returns a new builder scoped to the slaves uniquely identified by the given slaveHosts. A
      * builder can only be scoped to slaves once.
      *
      * @param slaveHost The hostname of the slave to scope the query to.
-     * @return A new Builder scoped to the given slave.
+     * @param slaveHosts Additional slave hostnames to scope this query to (they are ORed together).
+     * @return A new Builder scoped to the given slaves.
      */
-    public Builder bySlave(String slaveHost) {
+    public Builder bySlave(String slaveHost, String... slaveHosts) {
       checkNotNull(slaveHost);
 
-      return new Builder(query.deepCopy().setSlaveHost(slaveHost));
+      return bySlave(ImmutableSet.<String>builder().add(slaveHost).add(slaveHosts).build());
+    }
+
+    /**
+     * Creates a new builder scoped to slaveHosts.
+     *
+     * @see Builder#bySlave(String, String...)
+     *
+     * @param slaveHosts The slaveHost to scope this builder to.
+     * @return A new Builder scoped to the slaveHosts.
+     */
+    public Builder bySlave(Iterable<String> slaveHosts) {
+      checkNotNull(slaveHosts);
+
+      return new Builder(query.deepCopy().setSlaveHosts(ImmutableSet.copyOf(slaveHosts)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
index 7337044..789eaaf 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -78,9 +78,7 @@ public class Maintenance {
 
   private Multimap<String, String> getTasksByHosts(StoreProvider provider, Iterable<String> hosts) {
     ImmutableSet.Builder<IScheduledTask> drainingTasks = ImmutableSet.builder();
-    for (String host : hosts) {
-      drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(host).active()));
-    }
+    drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(hosts).active()));
     return Multimaps.transformValues(
         Multimaps.index(drainingTasks.build(), Tasks.SCHEDULED_TO_SLAVE_HOST),
         Tasks.SCHEDULED_TO_ID);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index d1ab503..421b330 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -72,18 +72,19 @@ class MemTaskStore implements TaskStore.Mutable {
 
   private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
 
-  private static final Function<Query.Builder, Optional<IJobKey>> QUERY_TO_JOB_KEY =
-      new Function<Query.Builder, Optional<IJobKey>>() {
+  private static final Function<Query.Builder, Optional<Set<IJobKey>>> QUERY_TO_JOB_KEY =
+      new Function<Query.Builder, Optional<Set<IJobKey>>>() {
         @Override
-        public Optional<IJobKey> apply(Query.Builder query) {
-          return JobKeys.from(query);
+        public Optional<Set<IJobKey>> apply(Query.Builder query) {
+          Optional<IJobKey> jobkey = JobKeys.from(query);
+          return jobkey.isPresent() ? Optional.of(jobkey.asSet()) : Optional.<Set<IJobKey>>absent();
         }
       };
-  private static final Function<Query.Builder, Optional<String>> QUERY_TO_SLAVE_HOST =
-      new Function<Query.Builder, Optional<String>>() {
+  private static final Function<Query.Builder, Optional<Set<String>>> QUERY_TO_SLAVE_HOST =
+      new Function<Query.Builder, Optional<Set<String>>>() {
         @Override
-        public Optional<String> apply(Query.Builder query) {
-          return Optional.fromNullable(query.get().getSlaveHost());
+        public Optional<Set<String>> apply(Query.Builder query) {
+          return Optional.fromNullable(query.get().getSlaveHosts());
         }
       };
 
@@ -265,8 +266,8 @@ class MemTaskStore implements TaskStore.Mutable {
             return false;
           }
         }
-        if (!StringUtils.isEmpty(query.getSlaveHost())) {
-          if (!query.getSlaveHost().equals(task.getAssignedTask().getSlaveHost())) {
+        if (query.getSlaveHostsSize() > 0) {
+          if (!query.getSlaveHosts().contains(task.getAssignedTask().getSlaveHost())) {
             return false;
           }
         }
@@ -365,19 +366,19 @@ class MemTaskStore implements TaskStore.Mutable {
     private final Multimap<K, String> index =
         Multimaps.synchronizedSetMultimap(HashMultimap.<K, String>create());
     private final Function<IScheduledTask, K> indexer;
-    private final Function<Query.Builder, Optional<K>> queryExtractor;
+    private final Function<Query.Builder, Optional<Set<K>>> queryExtractor;
     private final AtomicLong hitCount;
 
     /**
      * Creates a secondary index that will extract keys from tasks using the provided indexer.
      *
      * @param indexer Indexing function.
-     * @param queryExtractor Function to extract the key relevant to a query.
-     * @param hitCount Counter for number of times the seconary index applies to a query.
+     * @param queryExtractor Function to extract the keys relevant to a query.
+     * @param hitCount Counter for number of times the secondary index applies to a query.
      */
     SecondaryIndex(
         Function<IScheduledTask, K> indexer,
-        Function<Query.Builder, Optional<K>> queryExtractor,
+        Function<Query.Builder, Optional<Set<K>>> queryExtractor,
         AtomicLong hitCount) {
 
       this.indexer = indexer;
@@ -416,12 +417,17 @@ class MemTaskStore implements TaskStore.Mutable {
       }
     }
 
-    private final Function<K, Iterable<String>> lookup = new Function<K, Iterable<String>>() {
-      @Override
-      public Iterable<String> apply(K key) {
-        hitCount.incrementAndGet();
-        return index.get(key);
-      }
+    private final Function<Set<K>, Iterable<String>> lookup =
+        new Function<Set<K>, Iterable<String>>() {
+          @Override
+          public Iterable<String> apply(Set<K> keys) {
+            hitCount.incrementAndGet();
+            ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+            for (K key : keys) {
+              builder.addAll(index.get(key));
+            }
+            return builder.build();
+          }
     };
 
     Optional<Iterable<String>> getMatches(Query.Builder query) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/python/apache/aurora/client/api/sla.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/sla.py b/src/main/python/apache/aurora/client/api/sla.py
index a7c3158..71013cd 100644
--- a/src/main/python/apache/aurora/client/api/sla.py
+++ b/src/main/python/apache/aurora/client/api/sla.py
@@ -254,10 +254,9 @@ class Sla(object):
     check_and_log_response(resp)
     return resp.result.scheduleStatusResult.tasks
 
-  def _create_task_query(self, job_key=None, host=None):
+  def _create_task_query(self, job_key=None):
     return TaskQuery(
         owner=Identity(role=job_key.role) if job_key else None,
         environment=job_key.env if job_key else None,
         jobName=job_key.name if job_key else None,
-        slaveHost=host,
         statuses=SLA_LIVE_STATES)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 3ee24c7..f9fc6bc 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -359,8 +359,8 @@ struct TaskQuery {
   2: string jobName
   4: set<string> taskIds
   5: set<ScheduleStatus> statuses
-  6: string slaveHost
   7: set<i32> instanceIds
+  10: set<string> slaveHosts
 }
 
 struct HostStatus {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index 884f589..0c1b144 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -107,6 +107,18 @@ public class MemTaskStoreTest {
   }
 
   @Test
+  public void testQueryBySlaveHost() {
+    String hostA = "slaveA";
+    String hostB = "slaveB";
+    final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(hostA));
+    final IScheduledTask b = setHost(makeTask("b", "role", "env", "job"), Optional.of(hostB));
+    store.saveTasks(ImmutableSet.of(a, b));
+
+    assertQueryResults(Query.slaveScoped(hostA), a);
+    assertQueryResults(Query.slaveScoped(hostA, hostB), a, b);
+  }
+
+  @Test
   public void testMutate() {
     store.saveTasks(ImmutableSet.of(TASK_A, TASK_B, TASK_C, TASK_D));
     assertQueryResults(Query.statusScoped(RUNNING));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9c09b12/src/test/resources/org/apache/aurora/gen/api.thrift.md5
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/gen/api.thrift.md5 b/src/test/resources/org/apache/aurora/gen/api.thrift.md5
index 4e6c51d..08c4c2a 100644
--- a/src/test/resources/org/apache/aurora/gen/api.thrift.md5
+++ b/src/test/resources/org/apache/aurora/gen/api.thrift.md5
@@ -1 +1 @@
-7645e899bf5b7f8ad2dcdbecae7f91e1
+7bf1d42fb84abddbfd8f580dd7dfdb03