You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/10/24 01:07:48 UTC

[1/2] git commit: Preparing for Identity struct deprecation (scheduler).

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 5d9be7339 -> 06935c042


Preparing for Identity struct deprecation (scheduler).

Bugs closed: AURORA-84

Reviewed at https://reviews.apache.org/r/26762/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/18ae0ab9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/18ae0ab9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/18ae0ab9

Branch: refs/heads/master
Commit: 18ae0ab9696e3565cf57f6a2550c61142e76bee5
Parents: 5d9be73
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Oct 23 16:00:10 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Oct 23 16:00:10 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/MesosTaskFactory.java      |  2 +-
 .../aurora/scheduler/TaskIdGenerator.java       |  6 +-
 .../org/apache/aurora/scheduler/TaskVars.java   |  2 +-
 .../aurora/scheduler/async/Preemptor.java       |  2 +-
 .../apache/aurora/scheduler/base/JobKeys.java   | 14 +---
 .../org/apache/aurora/scheduler/base/Query.java | 25 ++++----
 .../org/apache/aurora/scheduler/base/Tasks.java |  2 +-
 .../configuration/ConfigurationManager.java     | 46 ++++++++------
 .../aurora/scheduler/http/Utilization.java      |  2 +-
 .../aurora/scheduler/quota/QuotaManager.java    |  5 +-
 .../aurora/scheduler/sla/SlaAlgorithm.java      |  3 +-
 .../scheduler/state/StateManagerImpl.java       |  3 +-
 .../scheduler/storage/StorageBackfill.java      | 31 ++++++++-
 .../scheduler/storage/mem/MemTaskStore.java     | 15 ++---
 .../thrift/SchedulerThriftInterface.java        | 67 ++++++++++----------
 .../updater/JobUpdateControllerImpl.java        |  2 +-
 .../thrift/org/apache/aurora/gen/api.thrift     | 14 ++--
 .../scheduler/MesosTaskFactoryImplTest.java     |  2 +
 .../apache/aurora/scheduler/TaskVarsTest.java   |  2 +
 .../aurora/scheduler/app/SchedulerIT.java       |  2 +
 .../scheduler/async/PreemptorImplTest.java      |  2 +
 .../scheduler/async/TaskHistoryPrunerTest.java  |  2 +
 .../scheduler/async/TaskSchedulerImplTest.java  |  2 +
 .../scheduler/async/TaskSchedulerTest.java      |  2 +
 .../scheduler/cron/quartz/QuartzTestUtil.java   |  1 +
 .../scheduler/quota/QuotaManagerImplTest.java   | 29 +++++----
 .../aurora/scheduler/sla/SlaTestUtil.java       |  2 +
 .../scheduler/state/StateManagerImplTest.java   |  2 +
 .../scheduler/stats/ResourceCounterTest.java    |  2 +
 .../scheduler/storage/StorageBackfillTest.java  |  1 +
 .../scheduler/storage/mem/MemTaskStoreTest.java |  2 +
 .../thrift/SchedulerThriftInterfaceTest.java    | 42 ++++++++++--
 .../updater/JobUpdateEventSubscriberTest.java   | 29 +++++----
 .../aurora/scheduler/updater/JobUpdaterIT.java  |  2 +
 34 files changed, 224 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
index 83d0406..1d974b1 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
@@ -100,7 +100,7 @@ public interface MesosTaskFactory {
     }
 
     public static String getJobSourceName(ITaskConfig task) {
-      return getJobSourceName(JobKeys.from(task));
+      return getJobSourceName(task.getJob());
     }
 
     public static String getInstanceSourceName(ITaskConfig task, int instanceId) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java b/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
index 5c75cc8..bcff437 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
@@ -51,11 +51,11 @@ public interface TaskIdGenerator {
       return new StringBuilder()
           .append(clock.nowMillis())               // Allows chronological sorting.
           .append(sep)
-          .append(task.getOwner().getRole())       // Identification and collision prevention.
+          .append(task.getJob().getRole())       // Identification and collision prevention.
           .append(sep)
-          .append(task.getEnvironment())
+          .append(task.getJob().getEnvironment())
           .append(sep)
-          .append(task.getJobName())
+          .append(task.getJob().getName())
           .append(sep)
           .append(instanceId)                      // Collision prevention within job.
           .append(sep)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index f1ab934..3ebb8d0 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -98,7 +98,7 @@ class TaskVars implements EventSubscriber {
     return String.format(
         "tasks_%s_%s",
         status,
-        JobKeys.canonicalString(JobKeys.from(task.getAssignedTask().getTask())));
+        JobKeys.canonicalString(task.getAssignedTask().getTask().getJob()));
   }
 
   private static final Predicate<IAttribute> IS_RACK = new Predicate<IAttribute>() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index e9f2515..e3e261d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -392,7 +392,7 @@ public interface Preemptor {
     }
 
     private static String getRole(IAssignedTask task) {
-      return task.getTask().getOwner().getRole();
+      return task.getTask().getJob().getRole();
     }
 
     private static Predicate<Integer> greaterThan(final int value) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
index a76c3fa..a5ffa5e 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
@@ -30,7 +30,6 @@ import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -123,17 +122,6 @@ public final class JobKeys {
   }
 
   /**
-   * Attempts to create a valid JobKey from the given task.
-   *
-   * @param task The task to create job key from.
-   * @return A valid JobKey if it can be created.
-   * @throws IllegalArgumentException if the key fails to validate.
-   */
-  public static IJobKey from(ITaskConfig task) throws IllegalArgumentException {
-    return from(task.getOwner().getRole(), task.getEnvironment(), task.getJobName());
-  }
-
-  /**
    * Create a "/"-delimited representation of job key usable as a unique identifier in this cluster.
    *
    * It is guaranteed that {@code k.equals(JobKeys.parse(JobKeys.canonicalString(k))}.
@@ -175,7 +163,7 @@ public final class JobKeys {
 
       if (taskQuery.isSetJobName()) {
         builder.add(from(
-            taskQuery.getOwner().getRole(),
+            taskQuery.getRole(),
             taskQuery.getEnvironment(),
             taskQuery.getJobName()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index d518acb..a6ff14a 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
 
-import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
@@ -54,8 +53,7 @@ public final class Query {
    */
   public static boolean isJobScoped(Builder taskQuery) {
     TaskQuery q = taskQuery.get();
-    return q.isSetOwner() && q.getOwner().isSetRole() && q.isSetEnvironment() && q.isSetJobName()
-        || q.isSetJobKeys();
+    return q.isSetRole() && q.isSetEnvironment() && q.isSetJobName() || q.isSetJobKeys();
   }
 
   /**
@@ -153,7 +151,15 @@ public final class Query {
     }
 
     Builder(final TaskQuery query) {
-      this.query = requireNonNull(query); // It is expected that the caller calls deepCopy.
+      // It is expected that the caller calls deepCopy.
+      // TODO(maxim): Safe to keep only Role here as TaskQuery is not returned back to the client.
+      // Remove in 0.7.0. (AURORA-749)
+      if (query.isSetOwner()) {
+        query.setRole(query.getOwner().getRole());
+        query.unsetOwner();
+      }
+
+      this.query = query;
     }
 
     /**
@@ -226,8 +232,7 @@ public final class Query {
     public Builder byRole(String role) {
       requireNonNull(role);
 
-      return new Builder(
-          query.deepCopy().setOwner(new Identity().setRole(role)));
+      return new Builder(query.deepCopy().setRole(role));
     }
 
     /**
@@ -243,9 +248,7 @@ public final class Query {
       requireNonNull(environment);
 
       return new Builder(
-          query.deepCopy()
-              .setOwner(new Identity().setRole(role))
-              .setEnvironment(environment));
+          query.deepCopy().setRole(role).setEnvironment(environment));
     }
 
     /**
@@ -335,7 +338,7 @@ public final class Query {
 
       return new Builder(
           query.deepCopy()
-              .setOwner(new Identity().setRole(jobKey.getRole()))
+              .setRole(jobKey.getRole())
               .setEnvironment(jobKey.getEnvironment())
               .setJobName(jobKey.getName())
               .setInstanceIds(ImmutableSet.<Integer>builder()
@@ -359,7 +362,7 @@ public final class Query {
 
       return new Builder(
           query.deepCopy()
-              .setOwner(new Identity().setRole(jobKey.getRole()))
+              .setRole(jobKey.getRole())
               .setEnvironment(jobKey.getEnvironment())
               .setJobName(jobKey.getName())
               .setInstanceIds(ImmutableSet.copyOf(instanceIds)));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 6ad7927..a2997f5 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -97,7 +97,7 @@ public final class Tasks {
       new Function<ITaskConfig, IJobKey>() {
         @Override
         public IJobKey apply(ITaskConfig task) {
-          return JobKeys.from(task);
+          return task.getJob();
         }
       };
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 5871dca..8cf845f 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -239,25 +239,16 @@ public final class ConfigurationManager {
 
     JobConfiguration builder = job.newBuilder();
 
-    assertOwnerValidity(job.getOwner());
-
     if (!JobKeys.isValid(job.getKey())) {
       throw new TaskDescriptionException("Job key " + job.getKey() + " is invalid.");
     }
-    if (!job.getKey().getRole().equals(job.getOwner().getRole())) {
-      throw new TaskDescriptionException("Role in job key must match job owner.");
-    }
-    if (!isGoodIdentifier(job.getKey().getRole())) {
-      throw new TaskDescriptionException(
-          "Job role contains illegal characters: " + job.getKey().getRole());
-    }
-    if (!isGoodIdentifier(job.getKey().getEnvironment())) {
-      throw new TaskDescriptionException(
-          "Job environment contains illegal characters: " + job.getKey().getEnvironment());
-    }
-    if (!isGoodIdentifier(job.getKey().getName())) {
-      throw new TaskDescriptionException(
-          "Job name contains illegal characters: " + job.getKey().getName());
+
+    if (job.isSetOwner()) {
+      assertOwnerValidity(job.getOwner());
+
+      if (!job.getKey().getRole().equals(job.getOwner().getRole())) {
+        throw new TaskDescriptionException("Role in job key must match job owner.");
+      }
     }
 
     builder.setTaskConfig(
@@ -293,8 +284,6 @@ public final class ConfigurationManager {
 
     maybeFillLinks(builder);
 
-    assertOwnerValidity(config.getOwner());
-
     if (!isGoodIdentifier(config.getJobName())) {
       throw new TaskDescriptionException(
           "Job name contains illegal characters: " + config.getJobName());
@@ -305,6 +294,27 @@ public final class ConfigurationManager {
           "Environment contains illegal characters: " + config.getEnvironment());
     }
 
+    if (config.isSetJob()) {
+      if (!JobKeys.isValid(config.getJob())) {
+        // Job key is set but invalid
+        throw new TaskDescriptionException("Job key " + config.getJob() + " is invalid.");
+      }
+
+      if (!config.getJob().getRole().equals(config.getOwner().getRole())) {
+        // Both owner and job key are set but don't match
+        throw new TaskDescriptionException("Role must match job owner.");
+      }
+    } else {
+      // TODO(maxim): Make sure both key and owner are populated to support older clients.
+      // Remove in 0.7.0. (AURORA-749).
+      // Job key is not set -> populate from owner, environment and name
+      assertOwnerValidity(config.getOwner());
+      builder.setJob(JobKeys.from(
+          config.getOwner().getRole(),
+          config.getEnvironment(),
+          config.getJobName()).newBuilder());
+    }
+
     if (!builder.isSetExecutorConfig()) {
       throw new TaskDescriptionException("Configuration may not be null");
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Utilization.java b/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
index a0cb7bf..10ded0a 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
@@ -200,7 +200,7 @@ public class Utilization {
     Function<ITaskConfig, Display> toKey = new Function<ITaskConfig, Display>() {
       @Override
       public Display apply(ITaskConfig task) {
-        String role = task.getOwner().getRole();
+        String role = task.getJob().getRole();
         return new Display(role, metric + "/" + role);
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 5f08997..3fbab8b 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -34,7 +34,6 @@ import com.google.inject.Inject;
 
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -151,7 +150,7 @@ public interface QuotaManager {
         return new QuotaCheckResult(SUFFICIENT_QUOTA);
       }
 
-      QuotaInfo quotaInfo = getQuotaInfo(template.getOwner().getRole());
+      QuotaInfo quotaInfo = getQuotaInfo(template.getJob().getRole());
 
       return QuotaCheckResult.greaterOrEqual(
           quotaInfo.getQuota(),
@@ -255,7 +254,7 @@ public interface QuotaManager {
         @Override
         public boolean apply(IScheduledTask input) {
           Optional<IJobUpdate> update = Optional.fromNullable(
-              roleJobUpdates.get(JobKeys.from(input.getAssignedTask().getTask())));
+              roleJobUpdates.get(input.getAssignedTask().getTask().getJob()));
 
           if (update.isPresent()) {
             IJobUpdateInstructions instructions = update.get().getInstructions();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
index 2d27ad9..eae79d5 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Range;
 import com.twitter.common.collections.Pair;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -284,7 +283,7 @@ interface SlaAlgorithm {
           @Override
           public InstanceId apply(IScheduledTask task) {
             return new InstanceId(
-                JobKeys.from(task.getAssignedTask().getTask()),
+                task.getAssignedTask().getTask().getJob(),
                 task.getAssignedTask().getInstanceId());
           }
         };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 58b94c2..6d2ac49 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -48,7 +48,6 @@ import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -132,7 +131,7 @@ public class StateManagerImpl implements StateManager {
       @Override
       protected void execute(MutableStoreProvider storeProvider) {
           ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
-            Query.jobScoped(JobKeys.from(task)).active());
+            Query.jobScoped(task.getJob()).active());
 
         Set<Integer> existingInstanceIds =
             FluentIterable.from(existingTasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
index 3b3cef2..3cadbaf 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
@@ -25,17 +25,18 @@ import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 /**
  * Utility class to contain and perform storage backfill operations.
@@ -47,6 +48,9 @@ public final class StorageBackfill {
   private static final AtomicLong SHARD_SANITY_CHECK_FAILS =
       Stats.exportLong("shard_sanity_check_failures");
 
+  private static final AtomicLong BACKFILLED_TASK_CONFIG_KEYS =
+      Stats.exportLong("task_config_keys_backfilled");
+
   private StorageBackfill() {
     // Utility class.
   }
@@ -67,9 +71,8 @@ public final class StorageBackfill {
 
     if (Tasks.isActive(task.getStatus())) {
       // Perform a sanity check on the number of active shards.
-      TaskConfig config = task.getAssignedTask().getTask();
       Query.Builder query = Query.instanceScoped(
-          JobKeys.from(config.getOwner().getRole(), config.getEnvironment(), config.getJobName()),
+          IJobKey.build(task.getAssignedTask().getTask().getJob()),
           task.getAssignedTask().getInstanceId())
           .active();
       Set<String> activeTasksInShard = FluentIterable.from(taskStore.fetchTasks(query))
@@ -122,6 +125,28 @@ public final class StorageBackfill {
   public static void backfill(final MutableStoreProvider storeProvider, final Clock clock) {
     backfillJobDefaults(storeProvider.getJobStore());
 
+    // Backfilling job keys has to be done in a separate transaction to ensure follow up scoped
+    // Query calls work against upgraded MemTaskStore, which does not support deprecated fields.
+    LOG.info("Backfilling task config job keys.");
+    storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new TaskMutation() {
+      @Override
+      public IScheduledTask apply(final IScheduledTask task) {
+        if (!task.getAssignedTask().getTask().isSetJob()) {
+          ScheduledTask builder = task.newBuilder();
+          TaskConfig config = builder.getAssignedTask().getTask();
+          config.setJob(new JobKey()
+              .setRole(config.getOwner().getRole())
+              .setEnvironment(config.getEnvironment())
+              .setName(config.getJobName()));
+
+          BACKFILLED_TASK_CONFIG_KEYS.incrementAndGet();
+          return IScheduledTask.build(builder);
+        }
+
+        return task;
+      }
+    });
+
     LOG.info("Performing shard uniqueness sanity check.");
     storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new TaskMutation() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index 3717623..f7f2841 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -231,15 +231,10 @@ class MemTaskStore implements TaskStore.Mutable {
       public boolean apply(Task canonicalTask) {
         IScheduledTask task = canonicalTask.storedTask;
         ITaskConfig config = task.getAssignedTask().getTask();
-        if (query.getOwner() != null) {
-          if (!StringUtils.isBlank(query.getOwner().getRole())
-              && !query.getOwner().getRole().equals(config.getOwner().getRole())) {
-            return false;
-          }
-          if (!StringUtils.isBlank(query.getOwner().getUser())
-              && !query.getOwner().getUser().equals(config.getOwner().getUser())) {
-            return false;
-          }
+        if (query.getRole() != null
+            && !StringUtils.isBlank(query.getRole())
+            && !query.getRole().equals(config.getJob().getRole())) {
+          return false;
         }
         if (query.getEnvironment() != null
             && !query.getEnvironment().equals(config.getEnvironment())) {
@@ -250,7 +245,7 @@ class MemTaskStore implements TaskStore.Mutable {
         }
 
         if (query.getJobKeysSize() > 0
-            && !query.getJobKeys().contains(JobKeys.from(config).newBuilder())) {
+            && !query.getJobKeys().contains(config.getJob().newBuilder())) {
           return false;
         }
         if (query.getTaskIds() != null && !query.getTaskIds().contains(Tasks.id(task))) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 610fe02..175bba2 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -211,7 +211,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       new Function<ITaskConfig, String>() {
         @Override
         public String apply(ITaskConfig task) {
-          return task.getOwner().getRole();
+          return task.getJob().getRole();
         }
       },
       Tasks.SCHEDULED_TO_INFO);
@@ -283,7 +283,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(
           session,
-          ImmutableSet.of(mutableJob.getOwner().getRole()));
+          ImmutableSet.of(mutableJob.getKey().getRole()));
       sanitized = SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(mutableJob));
     } catch (AuthFailedException e) {
       return errorResponse(AUTH_FAILED, e);
@@ -343,7 +343,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     requireNonNull(session);
 
     try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+      sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
       return errorResponse(AUTH_FAILED, e);
     }
@@ -418,7 +418,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     requireNonNull(session);
 
     try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+      sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
       return errorResponse(AUTH_FAILED, e);
     }
@@ -723,8 +723,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     Set<IJobKey> keys = JobKeys.from(taskQuery).or(ImmutableSet.<IJobKey>of());
     targetRoles.addAll(FluentIterable.from(keys).transform(JobKeys.TO_ROLE));
 
-    if (taskQuery.get().isSetOwner()) {
-      targetRoles.add(taskQuery.get().getOwner().getRole());
+    if (taskQuery.get().isSetRole()) {
+      targetRoles.add(taskQuery.get().getRole());
     }
     return sessionValidator.checkAuthenticated(session, targetRoles.build());
   }
@@ -1074,32 +1074,28 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     }
 
     if (existingJob.getKey().equals(rewrittenJob.getKey())) {
-      if (existingJob.getOwner().equals(rewrittenJob.getOwner())) {
-        Multimap<String, IJobConfiguration> matches = jobsByKey(jobStore, existingJob.getKey());
-        switch (matches.size()) {
-          case 0:
+      Multimap<String, IJobConfiguration> matches = jobsByKey(jobStore, existingJob.getKey());
+      switch (matches.size()) {
+        case 0:
+          error = Optional.of(
+              "No jobs found for key " + JobKeys.canonicalString(existingJob.getKey()));
+          break;
+
+        case 1:
+          Map.Entry<String, IJobConfiguration> match =
+              Iterables.getOnlyElement(matches.entries());
+          IJobConfiguration storedJob = match.getValue();
+          if (storedJob.equals(existingJob)) {
+            jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
+          } else {
             error = Optional.of(
-                "No jobs found for key " + JobKeys.canonicalString(existingJob.getKey()));
-            break;
-
-          case 1:
-            Map.Entry<String, IJobConfiguration> match =
-                Iterables.getOnlyElement(matches.entries());
-            IJobConfiguration storedJob = match.getValue();
-            if (storedJob.equals(existingJob)) {
-              jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
-            } else {
-              error = Optional.of(
-                  "CAS compare failed for " + JobKeys.canonicalString(storedJob.getKey()));
-            }
-            break;
-
-          default:
-            error = Optional.of("Multiple jobs found for key "
-                + JobKeys.canonicalString(existingJob.getKey()));
-        }
-      } else {
-        error = Optional.of("Disallowing rewrite attempting to change job owner.");
+                "CAS compare failed for " + JobKeys.canonicalString(storedJob.getKey()));
+          }
+          break;
+
+        default:
+          error = Optional.of("Multiple jobs found for key "
+              + JobKeys.canonicalString(existingJob.getKey()));
       }
     } else {
       error = Optional.of("Disallowing rewrite attempting to change job key.");
@@ -1202,7 +1198,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
           ImmutableSet<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks(
-              Query.jobScoped(JobKeys.from(task)).active());
+              Query.jobScoped(task.getJob()).active());
 
           validateTaskLimits(
               task,
@@ -1362,8 +1358,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     requireNonNull(mutableRequest);
     requireNonNull(session);
 
-    final IJobKey job =
-        JobKeys.assertValid(JobKeys.from(ITaskConfig.build(mutableRequest.getTaskConfig())));
+    // TODO(maxim): Switch to key field instead when AURORA-749 is fixed.
+    final IJobKey job = JobKeys.assertValid(IJobKey.build(new JobKey()
+        .setRole(mutableRequest.getTaskConfig().getOwner().getRole())
+        .setEnvironment(mutableRequest.getTaskConfig().getEnvironment())
+        .setName(mutableRequest.getTaskConfig().getJobName())));
 
     JobUpdateSettings settings = requireNonNull(mutableRequest.getSettings());
     if (settings.getUpdateGroupSize() <= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 6ec130f..f918d15 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -219,7 +219,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   public void instanceChangedState(final IScheduledTask updatedTask) {
     instanceChanged(
         InstanceKeys.from(
-            JobKeys.from(updatedTask.getAssignedTask().getTask()),
+            updatedTask.getAssignedTask().getTask().getJob(),
             updatedTask.getAssignedTask().getInstanceId()),
         Optional.of(updatedTask));
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 7a4aa73..b91fca9 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -45,6 +45,7 @@ const APIVersion CURRENT_API_VERSION = {'major': THRIFT_API_VERSION}
 // Aurora executor framework name.
 const string AURORA_EXECUTOR_NAME = 'AuroraExecutor'
 
+// TODO(maxim): Remove in 0.7.0. (AURORA-749)
 struct Identity {
   1: string role
   2: string user
@@ -187,11 +188,15 @@ struct ExecutorConfig {
 
 /** Description of the tasks contained within a job. */
 struct TaskConfig {
- // TODO(William Farner): Store a JobKey instead.
+ /** Job task belongs to. */
+ 28: JobKey job
+ // TODO(maxim): Remove in 0.7.0. (AURORA-749)
  /** contains the role component of JobKey */
  17: Identity owner
+ // TODO(maxim): Remove in 0.7.0. (AURORA-749)
  /** contains the environment component of JobKey */
  26: string environment
+ // TODO(maxim): Remove in 0.7.0. (AURORA-749)
  /** contains the name component of JobKey */
   3: string jobName
   7: bool isService
@@ -242,8 +247,7 @@ struct JobConfiguration {
    * used to construct it server-side.
    */
   9: JobKey key
-  // TODO(William Farner): Deprecate Identity and
-  // use JobKey instead (MESOS-4006).
+  // TODO(maxim): Remove in 0.7.0. (AURORA-749)
   /** Owner of this job. */
   7: Identity owner
   /**
@@ -477,7 +481,9 @@ struct GetJobsResult {
  * (terms are AND'ed together).
  */
 struct TaskQuery {
-  8: Identity owner               // TODO(wfarner): Deprecate Identity
+  // TODO(maxim): Remove in 0.7.0. (AURORA-749)
+  8: Identity owner
+  14: string role
   9: string environment
   2: string jobName
   4: set<string> taskIds

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
index e969747..2e72e96 100644
--- a/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
@@ -19,6 +19,7 @@ import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
 import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
@@ -42,6 +43,7 @@ public class MesosTaskFactoryImplTest {
       .setTaskId("task-id")
       .setAssignedPorts(ImmutableMap.of("http", 80))
       .setTask(new TaskConfig()
+          .setJob(new JobKey("role", "environment", "job-name"))
           .setOwner(new Identity("role", "user"))
           .setEnvironment("environment")
           .setJobName("job-name")

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index 371ae87..e091ca3 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -27,6 +27,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -127,6 +128,7 @@ public class TaskVarsTest extends EasyMockTest {
         .setAssignedTask(new AssignedTask()
             .setTaskId(TASK_ID)
             .setTask(new TaskConfig()
+                .setJob(new JobKey(ROLE_A, ENV, job))
                 .setJobName(job)
                 .setEnvironment(ENV)
                 .setOwner(new Identity(ROLE_A, ROLE_A + "-user"))));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index c405d4c..91a92c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -63,6 +63,7 @@ import com.twitter.thrift.ServiceInstance;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -284,6 +285,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
         .setAssignedTask(new AssignedTask()
             .setTaskId(id)
             .setTask(new TaskConfig()
+                .setJob(new JobKey("role-" + id, "test", "job-" + id))
                 .setJobName("job-" + id)
                 .setEnvironment("test")
                 .setExecutorConfig(new org.apache.aurora.gen.ExecutorConfig("AuroraExecutor", ""))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 8ee84cd..18c21e3 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -35,6 +35,7 @@ import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -581,6 +582,7 @@ public class PreemptorImplTest extends EasyMockTest {
     AssignedTask assignedTask = new AssignedTask()
         .setTaskId(taskId)
         .setTask(new TaskConfig()
+            .setJob(new JobKey(role, env, job))
             .setOwner(new Identity(role, role))
             .setPriority(priority)
             .setProduction(production)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
index b3e4ae3..9682c89 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
@@ -34,6 +34,7 @@ import com.twitter.common.util.testing.FakeClock;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -388,6 +389,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
         .setSlaveHost(SLAVE_HOST)
         .setTaskId(taskId)
         .setTask(new TaskConfig()
+            .setJob(new JobKey("role", "staging45", job))
             .setOwner(new Identity().setRole("role").setUser("user"))
             .setEnvironment("staging45")
             .setJobName(job)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 6534329..bb0b3b2 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -28,6 +28,7 @@ import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -306,6 +307,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
             .setInstanceId(0)
             .setTaskId(taskId)
             .setTask(new TaskConfig()
+                .setJob(new JobKey("role-" + taskId, "env-" + taskId, "job-" + taskId))
                 .setJobName("job-" + taskId)
                 .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
                 .setEnvironment("env-" + taskId))));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 919c79e..ec60880 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -32,6 +32,7 @@ import com.twitter.common.util.testing.FakeClock;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
@@ -225,6 +226,7 @@ public class TaskSchedulerTest extends EasyMockTest {
             .setInstanceId(0)
             .setTaskId(taskId)
             .setTask(new TaskConfig()
+                .setJob(new JobKey("role-" + taskId, "env-" + taskId, "job-" + taskId))
                 .setJobName("job-" + taskId)
                 .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
                 .setEnvironment("env-" + taskId))));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
index d2d3e86..2d74b32 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
@@ -40,6 +40,7 @@ final class QuartzTestUtil {
           .setOwner(new Identity("role", "user"))
           .setKey(AURORA_JOB_KEY.newBuilder())
           .setTaskConfig(new TaskConfig()
+              .setJob(AURORA_JOB_KEY.newBuilder())
               .setOwner(new Identity("role", "user"))
               .setJobName(AURORA_JOB_KEY.getName())
               .setEnvironment(AURORA_JOB_KEY.getEnvironment())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 33790b1..2fe2574 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -24,6 +24,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateSummary;
@@ -32,7 +33,6 @@ import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
@@ -380,12 +380,12 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     String updateId = "u1";
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(config));
+    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, config.getJob());
     IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
     JobUpdate builder = update.newBuilder();
     builder.getInstructions().unsetDesiredState();
 
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getOwner().getRole())))
+    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
         .andReturn(summaries).times(2);
 
     expect(jobUpdateStore.fetchJobUpdate(updateId))
@@ -407,12 +407,12 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     String updateId = "u1";
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(config));
+    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, config.getJob());
     IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
     JobUpdate builder = update.newBuilder();
     builder.getInstructions().setInitialState(ImmutableSet.<InstanceTaskConfig>of());
 
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getOwner().getRole())))
+    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
         .andReturn(summaries).times(2);
 
     expect(jobUpdateStore.fetchJobUpdate(updateId))
@@ -434,12 +434,12 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     String updateId = "u1";
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(config));
+    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, config.getJob());
     IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
     JobUpdate builder = update.newBuilder();
     builder.getInstructions().unsetDesiredState();
 
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getOwner().getRole())))
+    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
         .andReturn(summaries).times(2);
 
     expect(jobUpdateStore.fetchJobUpdate(updateId))
@@ -465,7 +465,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     ITaskConfig config = taskConfig(1, 1, 1, true);
     IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        buildJobUpdateSummaries("u1", config.getJob()).get(0),
         taskConfig(2, 2, 2, true),
         1,
         config,
@@ -488,7 +488,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
     IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        buildJobUpdateSummaries("u1", config.getJob()).get(0),
         config,
         1,
         config,
@@ -514,7 +514,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
     IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        buildJobUpdateSummaries("u1", config.getJob()).get(0),
         config,
         1,
         config,
@@ -533,7 +533,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaNewUpdateSkippedForNonProdDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, false);
     IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        buildJobUpdateSummaries("u1", config.getJob()).get(0),
         taskConfig(2, 2, 2, true),
         1,
         config,
@@ -549,7 +549,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaNewUpdateSkippedForEmptyDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, true);
     IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        buildJobUpdateSummaries("u1", config.getJob()).get(0),
         config,
         1,
         config,
@@ -603,11 +603,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
       int times) {
 
     String updateId = "u1";
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(initial));
+    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, initial.getJob());
     IJobUpdate update =
         buildJobUpdate(summaries.get(0), initial, intialInstances, desired, desiredInstances);
 
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(initial.getOwner().getRole())))
+    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(initial.getJob().getRole())))
         .andReturn(summaries)
         .times(times);
 
@@ -679,6 +679,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
                 .setTaskId(taskId)
                 .setInstanceId(instanceId)
                 .setTask(new TaskConfig()
+                    .setJob(new JobKey(ROLE, ENV, jobName))
                     .setOwner(new Identity(ROLE, ROLE))
                     .setEnvironment(ENV)
                     .setJobName(jobName)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
index 21640f7..7fb6278 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -48,6 +49,7 @@ final class SlaTestUtil {
             .setSlaveHost("host")
             .setInstanceId(instanceId)
             .setTask(new TaskConfig()
+                .setJob(new JobKey("role", "env", "job"))
                 .setJobName("job")
                 .setIsService(true)
                 .setProduction(isProd)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index cf4a015..d8a6145 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -29,6 +29,7 @@ import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -501,6 +502,7 @@ public class StateManagerImplTest extends EasyMockTest {
 
   private static ITaskConfig makeTask(Identity owner, String job) {
     return ITaskConfig.build(new TaskConfig()
+        .setJob(new JobKey(owner.getRole(), "devel", job))
         .setOwner(owner)
         .setEnvironment("devel")
         .setJobName(job)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
index fc12933..e5f3adf 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
@@ -167,6 +168,7 @@ public class ResourceCounterTest {
       Optional<String> dedicated) {
 
     TaskConfig task = new TaskConfig()
+        .setJob(new JobKey(role, "test", job))
         .setOwner(new Identity().setRole(role))
         .setEnvironment("test")
         .setJobName(job)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
index 0c1a271..cf9b0bd 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -123,6 +123,7 @@ public class StorageBackfillTest {
 
     // Since task fields are backfilled with defaults, additional flags should be filled.
     ITaskConfig expected = ITaskConfig.build(new TaskConfig(storedTask)
+        .setJob(JOB_KEY.newBuilder())
         .setProduction(false)
         .setMaxTaskFailures(1)
         .setExecutorConfig(EXECUTOR_CONFIG)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index 581f639..c1217e2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -34,6 +34,7 @@ import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -456,6 +457,7 @@ public class MemTaskStoreTest {
             .setInstanceId(0)
             .setTaskId(id)
             .setTask(new TaskConfig()
+                .setJob(new JobKey(role, env, jobName))
                 .setJobName(jobName)
                 .setEnvironment(env)
                 .setOwner(new Identity(role, role))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 3fde3f7..b42f6e2 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -330,6 +330,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testCreateJobNoLock() throws Exception {
+    // Validate key is populated during sanitizing.
+    JobConfiguration jobConfig = makeProdJob();
+    jobConfig.getTaskConfig().unsetJob();
+
     IJobConfiguration job = IJobConfiguration.build(makeProdJob());
     SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
     expectAuth(ROLE, true);
@@ -347,7 +351,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     control.replay();
 
-    assertOkResponse(thrift.createJob(job.newBuilder(), DEFAULT_LOCK, SESSION));
+    assertOkResponse(thrift.createJob(jobConfig, DEFAULT_LOCK, SESSION));
   }
 
   @Test
@@ -638,6 +642,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     JobConfiguration sanitized = job.deepCopy();
     sanitized.getTaskConfig()
+        .setJob(JOB_KEY.newBuilder())
         .setNumCpus(1.0)
         .setPriority(0)
         .setRamMb(1024)
@@ -717,6 +722,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         .setAssignedTask(new AssignedTask()
             .setTaskId(taskId)
             .setTask(new TaskConfig()
+                .setJob(JOB_KEY.newBuilder().setName(jobName))
                 .setOwner(ROLE_IDENTITY)
                 .setEnvironment("devel")
                 .setJobName(jobName))));
@@ -1071,7 +1077,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
     control.replay();
 
-    assertOkResponse(thrift.replaceCronTemplate(CRON_JOB, DEFAULT_LOCK, SESSION));
+    // Validate key is populated during sanitizing.
+    JobConfiguration jobConfig = CRON_JOB;
+    jobConfig.getTaskConfig().unsetJob();
+    assertOkResponse(thrift.replaceCronTemplate(jobConfig, DEFAULT_LOCK, SESSION));
   }
 
   @Test
@@ -1162,7 +1171,11 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(true);
     cronJobManager.updateJob(SanitizedCronJob.from(sanitized));
     control.replay();
-    assertResponse(OK, thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION));
+
+    // Validate key is populated during sanitizing.
+    JobConfiguration jobConfig = CRON_JOB;
+    jobConfig.getTaskConfig().unsetJob();
+    assertResponse(OK, thrift.scheduleCronJob(jobConfig, DEFAULT_LOCK, SESSION));
   }
 
   @Test
@@ -1390,6 +1403,10 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     control.replay();
 
+    // Validate key is populated during sanitizing.
+    JobConfiguration requestConfig = oldJob.deepCopy();
+    requestConfig.getTaskConfig().unsetJob();
+
     RewriteConfigsRequest request = new RewriteConfigsRequest(
         ImmutableList.of(ConfigRewrite.jobRewrite(
             new JobConfigRewrite(oldJob, newJob))));
@@ -1454,6 +1471,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testGetJobSummary() throws Exception {
     long nextCronRunMs = 100;
     TaskConfig ownedCronJobTask = nonProductionTask()
+        .setJob(JOB_KEY.newBuilder())
         .setJobName(JobKeys.TO_JOB_NAME.apply(JOB_KEY))
         .setOwner(ROLE_IDENTITY)
         .setEnvironment(JobKeys.TO_ENVIRONMENT.apply(JOB_KEY));
@@ -1470,6 +1488,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         .setKey(JOB_KEY.newBuilder().setRole("other"))
         .setTaskConfig(ownedCronJobTask.deepCopy().setOwner(otherOwner));
     TaskConfig ownedImmediateTaskInfo = defaultTask(false)
+        .setJob(JOB_KEY.newBuilder().setName("immediate"))
         .setJobName("immediate")
         .setOwner(ROLE_IDENTITY);
     Set<JobConfiguration> ownedCronJobOnly = ImmutableSet.of(ownedCronJob);
@@ -1572,6 +1591,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         .setKey(JOB_KEY.newBuilder().setRole("other"))
         .setTaskConfig(ownedCronJobTask.deepCopy().setOwner(otherOwner));
     TaskConfig ownedImmediateTaskInfo = defaultTask(false)
+        .setJob(JOB_KEY.newBuilder().setName("immediate"))
         .setJobName("immediate")
         .setOwner(ROLE_IDENTITY);
     Set<JobConfiguration> ownedCronJobOnly = ImmutableSet.of(ownedCronJob);
@@ -1637,6 +1657,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         .setKey(jobKey2)
         .setTaskConfig(nonProductionTask());
     TaskConfig immediateTaskConfig = defaultTask(false)
+        .setJob(JOB_KEY.newBuilder().setName("immediate"))
         .setJobName("immediate")
         .setOwner(ROLE_IDENTITY);
     IScheduledTask immediateTask = IScheduledTask.build(new ScheduledTask()
@@ -1853,6 +1874,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     Set<JobConfiguration> crons = ImmutableSet.of(cronJobOne, cronJobTwo, cronJobThree);
 
     TaskConfig immediateTaskConfig = defaultTask(false)
+        .setJob(JOB_KEY.newBuilder().setName("immediate"))
         .setJobName("immediate")
         .setOwner(ROLE_IDENTITY);
     IScheduledTask task1 = IScheduledTask.build(new ScheduledTask()
@@ -1861,12 +1883,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         .setAssignedTask(new AssignedTask().setTask(immediateTaskConfig.setNumCpus(2))));
 
     TaskConfig immediateTaskConfigTwo = defaultTask(false)
+        .setJob(JOB_KEY.newBuilder().setRole(BAZ_ROLE_IDENTITY.getRole()).setName("immediateTwo"))
         .setJobName("immediateTwo")
         .setOwner(BAZ_ROLE_IDENTITY);
     IScheduledTask task3 = IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask().setTask(immediateTaskConfigTwo)));
 
     TaskConfig immediateTaskConfigThree = defaultTask(false)
+        .setJob(JOB_KEY.newBuilder().setRole(BAZ_ROLE_IDENTITY.getRole()).setName("immediateThree"))
         .setJobName("immediateThree")
         .setOwner(BAZ_ROLE_IDENTITY);
     IScheduledTask task4 = IScheduledTask.build(new ScheduledTask()
@@ -1924,7 +1948,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   @Test
   public void testAddInstances() throws Exception {
     ITaskConfig populatedTask = ITaskConfig.build(populatedTask());
-    AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder());
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
@@ -1936,6 +1959,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     control.replay();
 
+    // Validate key is populated during sanitizing.
+    AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder());
+    config.getTaskConfig().unsetJob();
     assertOkResponse(thrift.addInstances(config, LOCK.newBuilder(), SESSION));
   }
 
@@ -2280,8 +2306,11 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     control.replay();
 
-    Response response =
-        assertOkResponse(thrift.startJobUpdate(buildJobUpdateRequest(update), SESSION));
+    // Validate key is populated during sanitizing.
+    JobUpdateRequest request = buildJobUpdateRequest(update);
+    request.getTaskConfig().unsetJob();
+
+    Response response = assertOkResponse(thrift.startJobUpdate(request, SESSION));
     assertEquals(UPDATE_ID, response.getResult().getStartJobUpdateResult().getUpdateId());
   }
 
@@ -2740,6 +2769,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   private static TaskConfig defaultTask(boolean production) {
     return new TaskConfig()
+        .setJob(JOB_KEY.newBuilder())
         .setOwner(new Identity(ROLE, USER))
         .setEnvironment("devel")
         .setJobName(JOB_NAME)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
index 5242a43..c53cfe0 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
@@ -37,22 +37,23 @@ import static org.easymock.EasyMock.expectLastCall;
 
 public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
-  private static final IJobKey JOB_A = JobKeys.from("role", "env", "name");
+  private static final IJobKey JOB = JobKeys.from("role", "env", "name");
 
-  private static final IScheduledTask TASK_A = IScheduledTask.build(
+  private static final IScheduledTask TASK = IScheduledTask.build(
       new ScheduledTask()
           .setStatus(ScheduleStatus.PENDING)
           .setAssignedTask(
               new AssignedTask()
                   .setInstanceId(5)
                   .setTask(new TaskConfig()
-                      .setOwner(new Identity().setRole(JOB_A.getRole()))
-                      .setEnvironment(JOB_A.getEnvironment())
-                      .setJobName(JOB_A.getName()))));
+                      .setJob(JOB.newBuilder())
+                      .setOwner(new Identity().setRole(JOB.getRole()))
+                      .setEnvironment(JOB.getEnvironment())
+                      .setJobName(JOB.getName()))));
   private static final IInstanceKey INSTANCE_A = IInstanceKey.build(
       new InstanceKey()
-          .setJobKey(JOB_A.newBuilder())
-          .setInstanceId(TASK_A.getAssignedTask().getInstanceId()));
+          .setJobKey(JOB.newBuilder())
+          .setInstanceId(TASK.getAssignedTask().getInstanceId()));
 
   private JobUpdateController updater;
 
@@ -68,11 +69,11 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
   @Test
   public void testStateChange() throws Exception {
-    updater.instanceChangedState(TASK_A);
+    updater.instanceChangedState(TASK);
 
     control.replay();
 
-    eventBus.post(TaskStateChange.initialized(TASK_A));
+    eventBus.post(TaskStateChange.initialized(TASK));
   }
 
   @Test
@@ -81,7 +82,7 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
     control.replay();
 
-    eventBus.post(new TasksDeleted(ImmutableSet.of(TASK_A)));
+    eventBus.post(new TasksDeleted(ImmutableSet.of(TASK)));
   }
 
   @Test
@@ -97,7 +98,7 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
   public void testHandlesExceptions() throws Exception {
     updater.systemResume();
     expectLastCall().andThrow(new RuntimeException());
-    updater.instanceChangedState(TASK_A);
+    updater.instanceChangedState(TASK);
     expectLastCall().andThrow(new RuntimeException());
     updater.instanceDeleted(INSTANCE_A);
     expectLastCall().andThrow(new RuntimeException());
@@ -105,8 +106,8 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
     control.replay();
 
     eventBus.post(new SchedulerActive());
-    eventBus.post(TaskStateChange.initialized(TASK_A));
-    eventBus.post(new TasksDeleted(ImmutableSet.of(TASK_A)));
+    eventBus.post(TaskStateChange.initialized(TASK));
+    eventBus.post(new TasksDeleted(ImmutableSet.of(TASK)));
   }
 
   @Test
@@ -114,7 +115,7 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
     control.replay();
 
     IScheduledTask task =
-        IScheduledTask.build(TASK_A.newBuilder().setStatus(ScheduleStatus.FAILED));
+        IScheduledTask.build(TASK.newBuilder().setStatus(ScheduleStatus.FAILED));
     eventBus.post(new TasksDeleted(ImmutableSet.of(task)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/18ae0ab9/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index f739e6d..61b6b8e 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -44,6 +44,7 @@ import com.twitter.common.util.TruncatedBinaryBackoff;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateAction;
 import org.apache.aurora.gen.JobUpdateEvent;
@@ -978,6 +979,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
   private static TaskConfig makeTaskConfig() {
     return new TaskConfig()
+        .setJob(new JobKey(JOB.newBuilder()))
         .setJobName(JOB.getName())
         .setEnvironment(JOB.getEnvironment())
         .setOwner(new Identity(JOB.getRole(), "user"))


[2/2] git commit: Preparing for Identity struct deprecation (client and executor).

Posted by ma...@apache.org.
Preparing for Identity struct deprecation (client and executor).

Bugs closed: AURORA-84

Reviewed at https://reviews.apache.org/r/26954/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/06935c04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/06935c04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/06935c04

Branch: refs/heads/master
Commit: 06935c04200fa78aea2ed60ee49ed234d796c6bd
Parents: 18ae0ab
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Oct 23 16:00:38 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Oct 23 16:00:38 2014 -0700

----------------------------------------------------------------------
 .../aurora/client/api/instance_watcher.py       |  6 +-
 src/main/python/apache/aurora/client/api/sla.py | 19 ++---
 .../python/apache/aurora/client/cli/jobs.py     | 10 +--
 .../python/apache/aurora/client/cli/task.py     | 11 +--
 .../apache/aurora/client/commands/admin.py      |  4 +-
 .../apache/aurora/client/commands/core.py       | 20 ++---
 .../python/apache/aurora/client/commands/ssh.py | 11 +--
 src/main/python/apache/aurora/config/thrift.py  |  1 +
 .../apache/aurora/executor/aurora_executor.py   |  2 +-
 .../apache/aurora/executor/common/announcer.py  |  7 +-
 .../aurora/executor/thermos_task_runner.py      |  2 +-
 .../aurora/client/api/test_instance_watcher.py  | 18 ++---
 .../python/apache/aurora/client/api/test_sla.py | 12 ++-
 .../aurora/client/cli/test_api_from_cli.py      | 37 +--------
 .../apache/aurora/client/cli/test_diff.py       | 43 +---------
 .../apache/aurora/client/cli/test_status.py     | 84 +++++++-------------
 .../apache/aurora/client/cli/test_task_run.py   | 69 +---------------
 .../apache/aurora/client/cli/test_update.py     | 13 ++-
 .../python/apache/aurora/client/cli/util.py     | 36 ++++++++-
 .../apache/aurora/client/commands/test_diff.py  |  9 ++-
 .../apache/aurora/client/commands/test_ssh.py   | 10 ++-
 .../aurora/client/commands/test_status.py       | 10 ++-
 .../aurora/client/commands/test_update.py       | 14 +++-
 .../python/apache/aurora/config/test_thrift.py  | 10 ++-
 .../aurora/executor/common/test_announcer.py    | 12 ++-
 .../aurora/executor/test_thermos_executor.py    |  3 +-
 26 files changed, 187 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/api/instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/instance_watcher.py b/src/main/python/apache/aurora/client/api/instance_watcher.py
index b390aa8..6ed8154 100644
--- a/src/main/python/apache/aurora/client/api/instance_watcher.py
+++ b/src/main/python/apache/aurora/client/api/instance_watcher.py
@@ -20,7 +20,7 @@ from twitter.common import log
 from .health_check import StatusHealthCheck
 from .task_util import StatusMuxHelper
 
-from gen.apache.aurora.api.ttypes import Identity, ScheduleStatus, TaskQuery
+from gen.apache.aurora.api.ttypes import ScheduleStatus, TaskQuery
 
 
 class Instance(object):
@@ -133,9 +133,7 @@ class InstanceWatcher(object):
 
   def _create_query(self, instance_ids):
     query = TaskQuery()
-    query.owner = Identity(role=self._job_key.role)
-    query.environment = self._job_key.environment
-    query.jobName = self._job_key.name
+    query.jobKeys = set([self._job_key])
     query.statuses = set([ScheduleStatus.RUNNING])
     query.instanceIds = instance_ids
     return query

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/api/sla.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/sla.py b/src/main/python/apache/aurora/client/api/sla.py
index b9b6468..5855685 100644
--- a/src/main/python/apache/aurora/client/api/sla.py
+++ b/src/main/python/apache/aurora/client/api/sla.py
@@ -22,7 +22,7 @@ from apache.aurora.client.base import DEFAULT_GROUPING, format_response, group_h
 from apache.aurora.common.aurora_job_key import AuroraJobKey
 
 from gen.apache.aurora.api.constants import LIVE_STATES
-from gen.apache.aurora.api.ttypes import Identity, ResponseCode, ScheduleStatus, TaskQuery
+from gen.apache.aurora.api.ttypes import ResponseCode, ScheduleStatus, TaskQuery
 
 
 def job_key_from_scheduled(task, cluster):
@@ -32,28 +32,25 @@ def job_key_from_scheduled(task, cluster):
   task -- ScheduledTask to get job key from.
   cluster -- Cluster the task belongs to.
   """
+  config = task.assignedTask.task
   return AuroraJobKey(
       cluster=cluster.name,
-      role=task.assignedTask.task.owner.role,
-      env=task.assignedTask.task.environment,
-      name=task.assignedTask.task.jobName
+      role=config.job.role if config.job else config.owner.role,
+      env=config.job.environment if config.job else config.environment,
+      name=config.job.name if config.job else config.jobName
   )
 
 
-def task_query(job_key=None, hosts=None, job_keys=None):
+def task_query(hosts=None, job_keys=None):
   """Creates TaskQuery optionally scoped by a job(s) or hosts.
 
   Arguments:
-  job_key -- AuroraJobKey to scope the query by.
   hosts -- list of hostnames to scope the query by.
   job_keys -- list of AuroraJobKeys to scope the query by.
   """
   return TaskQuery(
-      owner=Identity(role=job_key.role) if job_key else None,
-      environment=job_key.env if job_key else None,
-      jobName=job_key.name if job_key else None,
       slaveHosts=set(hosts) if hosts else None,
-      jobKeys=set(k.to_thrift() for k in job_keys) if job_keys else None,
+      jobKeys=[k.to_thrift() for k in job_keys] if job_keys else None,
       statuses=LIVE_STATES)
 
 
@@ -302,7 +299,7 @@ class Sla(object):
     Arguments:
     job_key -- job to create a task uptime vector for.
     """
-    return JobUpTimeSlaVector(self._get_tasks(task_query(job_key=job_key)))
+    return JobUpTimeSlaVector(self._get_tasks(task_query(job_keys=[job_key])))
 
   def get_domain_uptime_vector(self, cluster, min_instance_count, hosts=None):
     """Returns a DomainUpTimeSlaVector object with all available job uptimes.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 493ba21..625cb80 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -562,12 +562,12 @@ class StatusCommand(Verb):
       task_info = assigned_task.task
       task_strings = []
       task_strings.append("\tTask role: %s, env: %s, name: %s, instance: %s, status: %s on %s" %
-             (scheduled_task.assignedTask.task.owner.role,
-              scheduled_task.assignedTask.task.environment,
-              scheduled_task.assignedTask.task.jobName,
-              scheduled_task.assignedTask.instanceId,
+             (task_info.job.role if task_info.job else task_info.owner.role,
+              task_info.job.environment if task_info.job else task_info.environment,
+              task_info.job.name if task_info.job else task_info.jobName,
+              assigned_task.instanceId,
               ScheduleStatus._VALUES_TO_NAMES[scheduled_task.status],
-              scheduled_task.assignedTask.slaveHost))
+              assigned_task.slaveHost))
 
       if task_info:
         task_strings.append("""\t  cpus: %s, ram: %s MB, disk: %s MB""" % (

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index c41484b..37e0141 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -114,8 +114,9 @@ class SshCommand(Verb):
         api.cluster, executor_sandbox=context.options.executor_sandbox)
 
     ssh_command = ['ssh', '-t']
-    role = first_task.assignedTask.task.owner.role
-    slave_host = first_task.assignedTask.slaveHost
+    assigned = first_task.assignedTask
+    role = assigned.task.job.role if assigned.task.job else assigned.task.owner.role
+    slave_host = assigned.slaveHost
 
     for tunnel in context.options.tunnels:
       try:
@@ -124,11 +125,11 @@ class SshCommand(Verb):
       except ValueError:
         raise context.CommandError(EXIT_INVALID_PARAMETER,
             'Could not parse tunnel: %s.  Must be of form PORT:NAME' % tunnel)
-      if name not in first_task.assignedTask.assignedPorts:
+      if name not in assigned.assignedPorts:
         raise context.CommandError(EXIT_INVALID_PARAMETER,
-            'Task %s has no port named %s' % (first_task.assignedTask.taskId, name))
+            'Task %s has no port named %s' % (assigned.taskId, name))
       ssh_command += [
-          '-L', '%d:%s:%d' % (port, slave_host, first_task.assignedTask.assignedPorts[name])]
+          '-L', '%d:%s:%d' % (port, slave_host, assigned.assignedPorts[name])]
 
     ssh_command += ['%s@%s' % (context.options.ssh_user or role, slave_host), command]
     return subprocess.call(ssh_command)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/commands/admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/admin.py b/src/main/python/apache/aurora/client/commands/admin.py
index deee025..9719a58 100644
--- a/src/main/python/apache/aurora/client/commands/admin.py
+++ b/src/main/python/apache/aurora/client/commands/admin.py
@@ -268,8 +268,8 @@ def scheduler_print_recovery_tasks(cluster):
   for task in resp.result.queryRecoveryResult.tasks:
     assigned = task.assignedTask
     conf = assigned.task
-    log.info('\t'.join((conf.owner.role,
-                        conf.jobName,
+    log.info('\t'.join((conf.job.role if conf.job else conf.owner.role,
+                        conf.job.name if conf.job else conf.jobName,
                         str(assigned.instanceId),
                         ScheduleStatus._VALUES_TO_NAMES[task.status],
                         assigned.taskId)))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/commands/core.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/core.py b/src/main/python/apache/aurora/client/commands/core.py
index 58f419e..e634362 100644
--- a/src/main/python/apache/aurora/client/commands/core.py
+++ b/src/main/python/apache/aurora/client/commands/core.py
@@ -267,11 +267,12 @@ def diff(job_spec, config_file):
 
   pp = pprint.PrettyPrinter(indent=2)
   def pretty_print_task(task):
-    # The raw configuration is not interesting - we only care about what gets parsed.
+  # The raw configuration is not interesting - we only care about what gets parsed.
     task.configuration = None
     task.executorConfig = ExecutorConfig(
         name=AURORA_EXECUTOR_NAME,
         data=json.loads(task.executorConfig.data))
+
     return pp.pformat(vars(task))
 
   def pretty_print_tasks(tasks):
@@ -648,16 +649,17 @@ def status(args, options):
     return taskString
 
   def print_tasks(tasks):
-    for task in tasks:
-      taskString = print_task(task)
+    for scheduled in tasks:
+      taskString = print_task(scheduled)
 
+      assigned = scheduled.assignedTask
       log.info('role: %s, env: %s, name: %s, shard: %s, status: %s on %s\n%s' %
-             (task.assignedTask.task.owner.role,
-              task.assignedTask.task.environment,
-              task.assignedTask.task.jobName,
-              task.assignedTask.instanceId,
-              ScheduleStatus._VALUES_TO_NAMES[task.status],
-              task.assignedTask.slaveHost,
+             (assigned.task.job.role if assigned.task.job else assigned.task.owner.role,
+              assigned.task.job.environment if assigned.task.job else assigned.task.environment,
+              assigned.task.job.name if assigned.task.job else assigned.task.jobName,
+              assigned.instanceId,
+              ScheduleStatus._VALUES_TO_NAMES[scheduled.status],
+              assigned.slaveHost,
               taskString))
 
   api, job_key, _ = LiveJobDisambiguator.disambiguate_args_or_die(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/client/commands/ssh.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/ssh.py b/src/main/python/apache/aurora/client/commands/ssh.py
index d2b8bf6..b0916ed 100644
--- a/src/main/python/apache/aurora/client/commands/ssh.py
+++ b/src/main/python/apache/aurora/client/commands/ssh.py
@@ -82,8 +82,9 @@ def ssh(args, options):
 
   ssh_command = ['ssh', '-t']
 
-  role = first_task.assignedTask.task.owner.role
-  slave_host = first_task.assignedTask.slaveHost
+  assigned = first_task.assignedTask
+  role = assigned.task.job.role if assigned.task.job else assigned.task.owner.role
+  slave_host = assigned.slaveHost
 
   for tunnel in options.tunnels:
     try:
@@ -91,10 +92,10 @@ def ssh(args, options):
       port = int(port)
     except ValueError:
       die('Could not parse tunnel: %s.  Must be of form PORT:NAME' % tunnel)
-    if name not in first_task.assignedTask.assignedPorts:
-      die('Task %s has no port named %s' % (first_task.assignedTask.taskId, name))
+    if name not in assigned.assignedPorts:
+      die('Task %s has no port named %s' % (assigned.taskId, name))
     ssh_command += [
-        '-L', '%d:%s:%d' % (port, slave_host, first_task.assignedTask.assignedPorts[name])]
+        '-L', '%d:%s:%d' % (port, slave_host, assigned.assignedPorts[name])]
 
   ssh_command += ['%s@%s' % (options.ssh_user or role, slave_host), command]
   return subprocess.call(ssh_command)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/config/thrift.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/thrift.py b/src/main/python/apache/aurora/config/thrift.py
index 9ca806d..ba94ac3 100644
--- a/src/main/python/apache/aurora/config/thrift.py
+++ b/src/main/python/apache/aurora/config/thrift.py
@@ -196,6 +196,7 @@ def convert(job, metadata=frozenset(), ports=frozenset()):
     raise InvalidConfig('Task has invalid resources.  cpu/ramMb/diskMb must all be positive: '
         'cpu:%r ramMb:%r diskMb:%r' % (task.numCpus, task.ramMb, task.diskMb))
 
+  task.job = key
   task.owner = owner
   task.requestedPorts = ports
   task.taskLinks = not_empty_or(job.task_links(), {})

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/executor/aurora_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py
index 2c6423d..636b23d 100644
--- a/src/main/python/apache/aurora/executor/aurora_executor.py
+++ b/src/main/python/apache/aurora/executor/aurora_executor.py
@@ -38,7 +38,7 @@ class DefaultSandboxProvider(SandboxProvider):
   def from_assigned_task(self, assigned_task):
     return DirectorySandbox(
         os.path.realpath(self.SANDBOX_NAME),
-        assigned_task.task.owner.role)
+        assigned_task.task.job.role if assigned_task.task.job else assigned_task.task.owner.role)
 
 
 class AuroraExecutor(ExecutorBase, Observable):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/executor/common/announcer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/announcer.py b/src/main/python/apache/aurora/executor/common/announcer.py
index 74b2114..9e5bdc3 100644
--- a/src/main/python/apache/aurora/executor/common/announcer.py
+++ b/src/main/python/apache/aurora/executor/common/announcer.py
@@ -110,10 +110,11 @@ class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
     return KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
 
   def make_zk_path(self, assigned_task):
+    config = assigned_task.task
     role, environment, name = (
-        assigned_task.task.owner.role,
-        assigned_task.task.environment,
-        assigned_task.task.jobName)
+        config.job.role if config.job else config.owner.role,
+        config.job.environment if config.job else config.environment,
+        config.job.name if config.job else config.jobName)
     return posixpath.join(self.__root, role, environment, name)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/main/python/apache/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py
index bb99bd1..9a2faa0 100644
--- a/src/main/python/apache/aurora/executor/thermos_task_runner.py
+++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py
@@ -367,7 +367,7 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
 
   def from_assigned_task(self, assigned_task, sandbox):
     task_id = assigned_task.taskId
-    role = assigned_task.task.owner.role
+    role = assigned_task.task.job.role if assigned_task.task.job else assigned_task.task.owner.role
     try:
       mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
     except ValueError as e:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/api/test_instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_instance_watcher.py b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
index ae1b24b..abbbdbe 100644
--- a/src/test/python/apache/aurora/client/api/test_instance_watcher.py
+++ b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
@@ -23,7 +23,6 @@ from apache.aurora.client.api.instance_watcher import InstanceWatcher
 from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
 from gen.apache.aurora.api.ttypes import (
     AssignedTask,
-    Identity,
     JobKey,
     Response,
     ResponseCode,
@@ -72,16 +71,13 @@ class InstanceWatcherTest(unittest.TestCase):
   EXPECTED_CYCLES = find_expected_cycles(WATCH_SECS, 3.0)
 
   def setUp(self):
-    self._role = 'mesos'
-    self._env = 'test'
-    self._name = 'jimbob'
     self._clock = FakeClock()
     self._event = FakeEvent(self._clock)
     self._scheduler = mox.MockObject(scheduler_client)
-    job_key = JobKey(name=self._name, environment=self._env, role=self._role)
+    self._job_key = JobKey(role='mesos', name='jimbob', environment='test')
     self._health_check = mox.MockObject(HealthCheck)
     self._watcher = InstanceWatcher(self._scheduler,
-                                 job_key,
+                                 self._job_key,
                                  self.RESTART_THRESHOLD,
                                  self.WATCH_SECS,
                                  health_check_interval_seconds=3,
@@ -90,9 +86,7 @@ class InstanceWatcherTest(unittest.TestCase):
 
   def get_tasks_status_query(self, instance_ids):
     query = TaskQuery()
-    query.owner = Identity(role=self._role)
-    query.environment = self._env
-    query.jobName = self._name
+    query.jobKeys = set([self._job_key])
     query.statuses = set([ScheduleStatus.RUNNING])
     query.instanceIds = set(instance_ids)
     return query
@@ -108,7 +102,7 @@ class InstanceWatcherTest(unittest.TestCase):
     response.result.scheduleStatusResult = ScheduleStatusResult(tasks=tasks)
 
     query = self.get_tasks_status_query(instance_ids)
-    for x in range(int(num_calls)):
+    for _ in range(int(num_calls)):
       self._scheduler.getTasksWithoutConfigs(query).AndReturn(response)
 
   def expect_io_error_in_get_statuses(self, instance_ids=WATCH_INSTANCES,
@@ -119,14 +113,14 @@ class InstanceWatcherTest(unittest.TestCase):
     response.result.scheduleStatusResult = ScheduleStatusResult(tasks=tasks)
 
     query = self.get_tasks_status_query(instance_ids)
-    for x in range(int(num_calls)):
+    for _ in range(int(num_calls)):
       self._scheduler.getTasksWithoutConfigs(query).AndRaise(IOError('oops'))
 
   def mock_health_check(self, task, status, retry):
     self._health_check.health(task).InAnyOrder().AndReturn((status, retry))
 
   def expect_health_check(self, instance, status, retry=True, num_calls=EXPECTED_CYCLES):
-    for x in range(int(num_calls)):
+    for _ in range(int(num_calls)):
       self.mock_health_check(self.create_task(instance), status, retry)
 
   def assert_watch_result(self, expected_failed_instances, instances_to_watch=WATCH_INSTANCES):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/api/test_sla.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_sla.py b/src/test/python/apache/aurora/client/api/test_sla.py
index 1117f24..50a6c47 100644
--- a/src/test/python/apache/aurora/client/api/test_sla.py
+++ b/src/test/python/apache/aurora/client/api/test_sla.py
@@ -27,6 +27,7 @@ from gen.apache.aurora.api.constants import LIVE_STATES
 from gen.apache.aurora.api.ttypes import (
     AssignedTask,
     Identity,
+    JobKey,
     Response,
     ResponseCode,
     Result,
@@ -67,6 +68,7 @@ class SlaTest(unittest.TestCase):
             slaveHost=host,
             task=TaskConfig(
                 production=prod if prod is not None else True,
+                job=JobKey(role=self._role, environment=self._env, name=name or self._name),
                 jobName=name or self._name,
                 owner=Identity(role=self._role),
                 environment=self._env)),
@@ -162,13 +164,9 @@ class SlaTest(unittest.TestCase):
     )
 
   def expect_task_status_call_job_scoped(self):
-    self._scheduler.getTasksWithoutConfigs.assert_called_once_with(
-        TaskQuery(
-            owner=Identity(role=self._role),
-            environment=self._env,
-            jobName=self._name,
-            statuses=LIVE_STATES)
-    )
+    self._scheduler.getTasksWithoutConfigs.assert_called_once_with(TaskQuery(
+        jobKeys=[self._job_key.to_thrift()],
+        statuses=LIVE_STATES))
 
   def expect_task_status_call_cluster_scoped(self):
     self._scheduler.getTasksWithoutConfigs.assert_called_with(TaskQuery(statuses=LIVE_STATES))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/cli/test_api_from_cli.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_api_from_cli.py b/src/test/python/apache/aurora/client/cli/test_api_from_cli.py
index 95aa649..4de8bb4 100644
--- a/src/test/python/apache/aurora/client/cli/test_api_from_cli.py
+++ b/src/test/python/apache/aurora/client/cli/test_api_from_cli.py
@@ -23,19 +23,13 @@ from apache.aurora.client.cli.util import AuroraClientCommandTest
 
 from gen.apache.aurora.api import AuroraAdmin
 from gen.apache.aurora.api.ttypes import (
-    AssignedTask,
     GetJobsResult,
-    Identity,
     JobConfiguration,
     JobKey,
     Response,
     ResponseCode,
     Result,
-    ScheduledTask,
-    ScheduleStatus,
     ScheduleStatusResult,
-    TaskConfig,
-    TaskEvent,
     TaskQuery
 )
 
@@ -46,35 +40,6 @@ class TestApiFromCLI(AuroraClientCommandTest):
   """
 
   @classmethod
-  def create_mock_scheduled_tasks(cls):
-    tasks = []
-    for name in ['foo', 'bar', 'baz']:
-      task = Mock(spec=ScheduledTask)
-      task.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
-      task.failure_count = 0
-      task.assignedTask = Mock(spec=AssignedTask)
-      task.assignedTask.slaveHost = 'slavehost'
-      task.assignedTask.task = Mock(spec=TaskConfig)
-      task.assignedTask.task.maxTaskFailures = 1
-      task.assignedTask.task.metadata = []
-      task.assignedTask.task.owner = Identity(role='bozo')
-      task.assignedTask.task.environment = 'test'
-      task.assignedTask.task.jobName = 'woops'
-      task.assignedTask.task.numCpus = 2
-      task.assignedTask.task.ramMb = 2
-      task.assignedTask.task.diskMb = 2
-      task.assignedTask.instanceId = 4237894
-      task.assignedTask.assignedPorts = None
-      task.status = ScheduleStatus.RUNNING
-      mockEvent = Mock(spec=TaskEvent)
-      mockEvent.timestamp = 28234726395
-      mockEvent.status = ScheduleStatus.RUNNING
-      mockEvent.message = "Hi there"
-      task.taskEvents = [mockEvent]
-      tasks.append(task)
-    return tasks
-
-  @classmethod
   def create_mock_scheduled_task_no_metadata(cls):
     result = cls.create_mock_scheduled_tasks()
     for task in result:
@@ -104,7 +69,7 @@ class TestApiFromCLI(AuroraClientCommandTest):
   def create_status_response(cls):
     resp = cls.create_simple_success_response()
     resp.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
-    resp.result.scheduleStatusResult.tasks = set(cls.create_mock_scheduled_tasks())
+    resp.result.scheduleStatusResult.tasks = set(cls.create_scheduled_tasks())
     return resp
 
   @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/cli/test_diff.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_diff.py b/src/test/python/apache/aurora/client/cli/test_diff.py
index 1081769..78694d7 100644
--- a/src/test/python/apache/aurora/client/cli/test_diff.py
+++ b/src/test/python/apache/aurora/client/cli/test_diff.py
@@ -24,17 +24,11 @@ from apache.aurora.client.cli.util import AuroraClientCommandTest
 
 from gen.apache.aurora.api.constants import ACTIVE_STATES
 from gen.apache.aurora.api.ttypes import (
-    AssignedTask,
-    ExecutorConfig,
-    Identity,
     JobConfiguration,
     JobKey,
     PopulateJobResult,
     ResponseCode,
-    ScheduleStatus,
     ScheduleStatusResult,
-    TaskConfig,
-    TaskEvent,
     TaskQuery
 )
 
@@ -44,7 +38,6 @@ class TestDiffCommand(AuroraClientCommandTest):
   def setup_mock_options(cls):
     """set up to get a mock options object."""
     mock_options = Mock()
-    mock_options = Mock()
     mock_options.env = None
     mock_options.json = False
     mock_options.bindings = {}
@@ -54,41 +47,10 @@ class TestDiffCommand(AuroraClientCommandTest):
     return mock_options
 
   @classmethod
-  def create_mock_scheduled_tasks(cls):
-    jobs = []
-    for name in ['foo', 'bar', 'baz']:
-      job = Mock()
-      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
-      job.failure_count = 0
-      job.assignedTask = Mock(spec=AssignedTask)
-      job.assignedTask.slaveHost = 'slavehost'
-      job.assignedTask.task = Mock(spec=TaskConfig)
-      job.assignedTask.task.maxTaskFailures = 1
-      job.assignedTask.task.executorConfig = Mock(spec=ExecutorConfig)
-      job.assignedTask.task.executorConfig.data = Mock()
-      job.assignedTask.task.metadata = []
-      job.assignedTask.task.owner = Identity(role='bozo')
-      job.assignedTask.task.environment = 'test'
-      job.assignedTask.task.jobName = 'woops'
-      job.assignedTask.task.numCpus = 2
-      job.assignedTask.task.ramMb = 2
-      job.assignedTask.task.diskMb = 2
-      job.assignedTask.instanceId = 4237894
-      job.assignedTask.assignedPorts = None
-      job.status = ScheduleStatus.RUNNING
-      mockEvent = Mock(spec=TaskEvent)
-      mockEvent.timestamp = 28234726395
-      mockEvent.status = ScheduleStatus.RUNNING
-      mockEvent.message = "Hi there"
-      job.taskEvents = [mockEvent]
-      jobs.append(job)
-    return jobs
-
-  @classmethod
   def create_status_response(cls):
     resp = cls.create_simple_success_response()
     resp.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
-    resp.result.scheduleStatusResult.tasks = set(cls.create_mock_scheduled_tasks())
+    resp.result.scheduleStatusResult.tasks = set(cls.create_scheduled_tasks())
     return resp
 
   @classmethod
@@ -100,7 +62,8 @@ class TestDiffCommand(AuroraClientCommandTest):
     populate = cls.create_simple_success_response()
     populate.result.populateJobResult = Mock(spec=PopulateJobResult)
     api.populateJobConfig.return_value = populate
-    populate.result.populateJobResult.populatedDEPRECATED = cls.create_mock_scheduled_tasks()
+    tasks = set(task.assignedTask.task for task in cls.create_scheduled_tasks())
+    populate.result.populateJobResult.populatedDEPRECATED = tasks
     return populate
 
   def test_successful_diff(self):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/cli/test_status.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_status.py b/src/test/python/apache/aurora/client/cli/test_status.py
index 49ac2a4..8894a1e 100644
--- a/src/test/python/apache/aurora/client/cli/test_status.py
+++ b/src/test/python/apache/aurora/client/cli/test_status.py
@@ -43,45 +43,22 @@ from gen.apache.aurora.api.ttypes import (
 
 class TestJobStatus(AuroraClientCommandTest):
   @classmethod
-  def create_mock_scheduled_tasks(cls):
-    tasks = []
+  def create_scheduled_tasks(cls):
+    tasks = AuroraClientCommandTest.create_scheduled_tasks()
     instance = 0
-    for name in ['foo', 'bar', 'baz']:
+    for task in tasks:
       instance += 1
-      event = TaskEvent(
-        timestamp=28234726395,
-        status=ScheduleStatus.RUNNING,
-        message="Hi there"
-      )
-      task = ScheduledTask(
-        failureCount=0,
-        assignedTask=AssignedTask(
-          slaveHost='slavehost',
-          task=TaskConfig(
-            maxTaskFailures=1,
-            metadata=[],
-            owner=Identity(role='bozo'),
-            environment='test',
-            jobName='woops',
-            numCpus=2,
-            ramMb=2,
-            diskMb=2
-          ),
-          instanceId=instance,
-          assignedPorts=None,
-        ),
-        status=ScheduleStatus.RUNNING,
-        taskEvents=[event]
-      )
-      tasks.append(task)
+      task.assignedTask.instanceId = instance
+      task.assignedTask.task.job = JobKey(role='bozo', environment='test', name='woops')
+      task.assignedTask.task.jobName = 'woops'
     return tasks
 
   @classmethod
-  def create_mock_inactive_tasks(cls):
-    jobs = []
+  def create_inactive_tasks(cls):
     instance = 0
     INACTIVE_STATUSES = [ScheduleStatus.KILLED, ScheduleStatus.FINISHED, ScheduleStatus.FAILED]
-    for instance in range(3):
+    tasks = cls.create_scheduled_tasks()
+    for task in tasks:
       events = []
       for i in range(3):
         event = TaskEvent(
@@ -89,36 +66,22 @@ class TestJobStatus(AuroraClientCommandTest):
           status=INACTIVE_STATUSES[i],
           message="Hi there")
         events.append(event)
-      job = ScheduledTask(
-        failureCount=3,
-        assignedTask=AssignedTask(
-          slaveHost='slavehost',
-          task=TaskConfig(
-            maxTaskFailures=1,
-            metadata=[],
-            owner=Identity(role='bozo'),
-            environment='test',
-            jobName='woops',
-            numCpus=2,
-            ramMb=2,
-            diskMb=2),
-          instanceId=instance,
-          assignedPorts=None),
-        status=INACTIVE_STATUSES[instance],
-        taskEvents=events)
-      jobs.append(job)
-    return set(jobs)
+      task.taskEvents = events
+      task.status = INACTIVE_STATUSES[instance]
+      task.assignedTask.instanceId = instance
+      instance += 1
+    return set(tasks)
 
   @classmethod
   def create_mock_scheduled_task_no_metadata(cls):
-    result = cls.create_mock_scheduled_tasks()
+    result = cls.create_scheduled_tasks()
     for job in result:
       job.assignedTask.task.metadata = None
     return result
 
   @classmethod
   def create_mock_scheduled_task_with_metadata(cls):
-    result = cls.create_mock_scheduled_tasks()
+    result = cls.create_scheduled_tasks()
     for job in result:
       job.assignedTask.task.metadata = [Metadata("meta", "data"), Metadata("data", "meta")]
     return result
@@ -144,7 +107,7 @@ class TestJobStatus(AuroraClientCommandTest):
   def create_status_response(cls):
     resp = cls.create_simple_success_response()
     resp.result.scheduleStatusResult = ScheduleStatusResult(
-      tasks=set(cls.create_mock_scheduled_tasks()))
+      tasks=set(cls.create_scheduled_tasks()))
     return resp
 
   @classmethod
@@ -157,7 +120,7 @@ class TestJobStatus(AuroraClientCommandTest):
   @classmethod
   def create_status_with_inactives(cls):
     resp = cls.create_status_null_metadata()
-    resp.result.scheduleStatusResult.tasks |= cls.create_mock_inactive_tasks()
+    resp.result.scheduleStatusResult.tasks |= cls.create_inactive_tasks()
     return resp
 
   @classmethod
@@ -182,6 +145,7 @@ class TestJobStatus(AuroraClientCommandTest):
           slaveId="random_machine_id",
           slaveHost="junk.nothing",
           task=TaskConfig(
+            job=JobKey(role="nobody", environment="prod", name='flibber'),
             owner=Identity(role="nobody"),
             environment="prod",
             jobName="flibber",
@@ -496,6 +460,11 @@ class TestJobStatus(AuroraClientCommandTest):
                     "owner": {
                       "role": "nobody"
                     },
+                    "job": {
+                      "environment": "prod",
+                      "role": "nobody",
+                      "name": "flibber"
+                    },
                     "production": false,
                     "diskMb": 4096,
                     "ramMb": 2048,
@@ -544,6 +513,11 @@ class TestJobStatus(AuroraClientCommandTest):
                     "owner": {
                       "role": "nobody"
                     },
+                    "job": {
+                      "environment": "prod",
+                      "role": "nobody",
+                      "name": "flibber"
+                    },
                     "production": false,
                     "diskMb": 4096,
                     "ramMb": 2048,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/cli/test_task_run.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_task_run.py b/src/test/python/apache/aurora/client/cli/test_task_run.py
index 16fde14..12163df 100644
--- a/src/test/python/apache/aurora/client/cli/test_task_run.py
+++ b/src/test/python/apache/aurora/client/cli/test_task_run.py
@@ -21,14 +21,10 @@ from apache.aurora.client.cli.client import AuroraCommandLine
 from apache.aurora.client.cli.util import AuroraClientCommandTest
 
 from gen.apache.aurora.api.ttypes import (
-    AssignedTask,
-    Identity,
     JobKey,
     ResponseCode,
     ScheduleStatus,
     ScheduleStatusResult,
-    TaskConfig,
-    TaskEvent,
     TaskQuery
 )
 
@@ -42,41 +38,10 @@ def mock_log(level, msg):
 class TestRunCommand(AuroraClientCommandTest):
 
   @classmethod
-  def create_mock_scheduled_tasks(cls):
-    jobs = []
-    for name in ['foo', 'bar', 'baz']:
-      job = Mock()
-      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
-      job.failure_count = 0
-      job.assignedTask = Mock(spec=AssignedTask)
-      job.assignedTask.taskId = 1287391823
-      job.assignedTask.slaveHost = 'slavehost'
-      job.assignedTask.task = Mock(spec=TaskConfig)
-      job.assignedTask.task.executorConfig = Mock()
-      job.assignedTask.task.maxTaskFailures = 1
-      job.assignedTask.task.metadata = []
-      job.assignedTask.task.owner = Identity(role='bozo')
-      job.assignedTask.task.environment = 'test'
-      job.assignedTask.task.jobName = 'woops'
-      job.assignedTask.task.numCpus = 2
-      job.assignedTask.task.ramMb = 2
-      job.assignedTask.task.diskMb = 2
-      job.assignedTask.instanceId = 4237894
-      job.assignedTask.assignedPorts = {}
-      job.status = ScheduleStatus.RUNNING
-      mockEvent = Mock(spec=TaskEvent)
-      mockEvent.timestamp = 28234726395
-      mockEvent.status = ScheduleStatus.RUNNING
-      mockEvent.message = "Hi there"
-      job.taskEvents = [mockEvent]
-      jobs.append(job)
-    return jobs
-
-  @classmethod
   def create_status_response(cls):
     resp = cls.create_simple_success_response()
     resp.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
-    resp.result.scheduleStatusResult.tasks = cls.create_mock_scheduled_tasks()
+    resp.result.scheduleStatusResult.tasks = cls.create_scheduled_tasks()
     return resp
 
   @classmethod
@@ -146,42 +111,12 @@ class TestRunCommand(AuroraClientCommandTest):
 
 
 class TestSshCommand(AuroraClientCommandTest):
-  @classmethod
-  def create_mock_scheduled_tasks(cls):
-    jobs = []
-    for name in ['foo', 'bar', 'baz']:
-      job = Mock()
-      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
-      job.failure_count = 0
-      job.assignedTask = Mock(spec=AssignedTask)
-      job.assignedTask.taskId = 1287391823
-      job.assignedTask.slaveHost = 'slavehost'
-      job.assignedTask.task = Mock(spec=TaskConfig)
-      job.assignedTask.task.executorConfig = Mock()
-      job.assignedTask.task.maxTaskFailures = 1
-      job.assignedTask.task.metadata = []
-      job.assignedTask.task.owner = Identity(role='bozo')
-      job.assignedTask.task.environment = 'test'
-      job.assignedTask.task.jobName = 'woops'
-      job.assignedTask.task.numCpus = 2
-      job.assignedTask.task.ramMb = 2
-      job.assignedTask.task.diskMb = 2
-      job.assignedTask.instanceId = 4237894
-      job.assignedTask.assignedPorts = {}
-      job.status = ScheduleStatus.RUNNING
-      mockEvent = Mock(spec=TaskEvent)
-      mockEvent.timestamp = 28234726395
-      mockEvent.status = ScheduleStatus.RUNNING
-      mockEvent.message = "Hi there"
-      job.taskEvents = [mockEvent]
-      jobs.append(job)
-    return jobs
 
   @classmethod
   def create_status_response(cls):
     resp = cls.create_simple_success_response()
     resp.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
-    resp.result.scheduleStatusResult.tasks = cls.create_mock_scheduled_tasks()
+    resp.result.scheduleStatusResult.tasks = cls.create_scheduled_tasks()
     return resp
 
   @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/cli/test_update.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_update.py b/src/test/python/apache/aurora/client/cli/test_update.py
index 1ec5483..08f277e 100644
--- a/src/test/python/apache/aurora/client/cli/test_update.py
+++ b/src/test/python/apache/aurora/client/cli/test_update.py
@@ -120,7 +120,11 @@ class TestUpdateCommand(AuroraClientCommandTest):
     populate = cls.create_simple_success_response()
     populate.result.populateJobResult = Mock(spec=PopulateJobResult)
     api.populateJobConfig.return_value = populate
-    configs = [TaskConfig(numCpus=1.0, ramMb=1, diskMb=1) for i in range(count)]
+    configs = [TaskConfig(
+        numCpus=1.0,
+        ramMb=1,
+        diskMb=1,
+        job=JobKey(role='bozo', environment='test', name='hello')) for i in range(count)]
     populate.result.populateJobResult.populatedDEPRECATED = set(configs)
     return populate
 
@@ -147,7 +151,12 @@ class TestUpdateCommand(AuroraClientCommandTest):
     scheduler.getTasksWithoutConfigs.return_value = status_response
     schedule_status = Mock(spec=ScheduleStatusResult)
     status_response.result.scheduleStatusResult = schedule_status
-    task_config = TaskConfig(numCpus=1.0, ramMb=10, diskMb=1)
+    task_config = TaskConfig(
+        numCpus=1.0,
+        ramMb=10,
+        diskMb=1,
+        job=JobKey(role='bozo', environment='test', name='hello'))
+
     # This should be a list of ScheduledTask's.
     schedule_status.tasks = []
     for i in range(20):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/cli/util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/util.py b/src/test/python/apache/aurora/client/cli/util.py
index 3fa609a..796c4f9 100644
--- a/src/test/python/apache/aurora/client/cli/util.py
+++ b/src/test/python/apache/aurora/client/cli/util.py
@@ -25,6 +25,9 @@ from apache.aurora.common.clusters import Clusters
 
 from gen.apache.aurora.api.ttypes import (
     AssignedTask,
+    ExecutorConfig,
+    Identity,
+    JobKey,
     Response,
     ResponseCode,
     Result,
@@ -126,7 +129,6 @@ class AuroraClientCommandTest(unittest.TestCase):
   @classmethod
   def create_mock_api(cls):
     """Builds up a mock API object, with a mock SchedulerProxy"""
-    mock_api = Mock(spec=HookedAuroraClientAPI)
     mock_scheduler = Mock()
     mock_scheduler.url = "http://something_or_other"
     mock_scheduler_client = Mock()
@@ -175,6 +177,38 @@ class AuroraClientCommandTest(unittest.TestCase):
     return mock_task
 
   @classmethod
+  def create_scheduled_tasks(cls):
+    tasks = []
+    for name in ['foo', 'bar', 'baz']:
+      task = ScheduledTask()
+      task.failure_count = 0
+      task.assignedTask = AssignedTask()
+      task.assignedTask.taskId = 1287391823
+      task.assignedTask.slaveHost = 'slavehost'
+      task.assignedTask.task = TaskConfig()
+      task.assignedTask.task.maxTaskFailures = 1
+      task.assignedTask.task.executorConfig = ExecutorConfig()
+      task.assignedTask.task.executorConfig.data = Mock()
+      task.assignedTask.task.metadata = []
+      task.assignedTask.task.job = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
+      task.assignedTask.task.owner = Identity(role=cls.TEST_ROLE)
+      task.assignedTask.task.environment = cls.TEST_ENV
+      task.assignedTask.task.jobName = name
+      task.assignedTask.task.numCpus = 2
+      task.assignedTask.task.ramMb = 2
+      task.assignedTask.task.diskMb = 2
+      task.assignedTask.instanceId = 4237894
+      task.assignedTask.assignedPorts = {}
+      task.status = ScheduleStatus.RUNNING
+      event = TaskEvent()
+      event.timestamp = 28234726395
+      event.status = ScheduleStatus.RUNNING
+      event.message = "Hi there"
+      task.taskEvents = [event]
+      tasks.append(task)
+    return tasks
+
+  @classmethod
   def setup_get_tasks_status_calls(cls, scheduler):
     status_response = cls.create_status_call_result()
     scheduler.getTasksWithoutConfigs.return_value = status_response

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/commands/test_diff.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_diff.py b/src/test/python/apache/aurora/client/commands/test_diff.py
index c8d0145..9f1d459 100644
--- a/src/test/python/apache/aurora/client/commands/test_diff.py
+++ b/src/test/python/apache/aurora/client/commands/test_diff.py
@@ -31,6 +31,7 @@ from gen.apache.aurora.api.ttypes import (
     JobKey,
     PopulateJobResult,
     ResponseCode,
+    ScheduledTask,
     ScheduleStatus,
     ScheduleStatusResult,
     TaskConfig,
@@ -44,7 +45,6 @@ class TestDiffCommand(AuroraClientCommandTest):
   def setup_mock_options(cls):
     """set up to get a mock options object."""
     mock_options = Mock()
-    mock_options = Mock()
     mock_options.env = None
     mock_options.json = False
     mock_options.bindings = {}
@@ -58,8 +58,7 @@ class TestDiffCommand(AuroraClientCommandTest):
   def create_mock_scheduled_tasks(cls):
     jobs = []
     for name in ['foo', 'bar', 'baz']:
-      job = Mock()
-      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
+      job = Mock(spec=ScheduledTask)
       job.failure_count = 0
       job.assignedTask = Mock(spec=AssignedTask)
       job.assignedTask.slaveHost = 'slavehost'
@@ -68,6 +67,7 @@ class TestDiffCommand(AuroraClientCommandTest):
       job.assignedTask.task.executorConfig = Mock(spec=ExecutorConfig)
       job.assignedTask.task.executorConfig.data = Mock()
       job.assignedTask.task.metadata = []
+      job.assignedTask.task.job = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
       job.assignedTask.task.owner = Identity(role='mchucarroll')
       job.assignedTask.task.environment = 'test'
       job.assignedTask.task.jobName = 'woops'
@@ -101,7 +101,8 @@ class TestDiffCommand(AuroraClientCommandTest):
     populate = cls.create_simple_success_response()
     populate.result.populateJobResult = Mock(spec=PopulateJobResult)
     api.populateJobConfig.return_value = populate
-    populate.result.populateJobResult.populatedDEPRECATED = cls.create_mock_scheduled_tasks()
+    tasks = set(task.assignedTask.task for task in cls.create_mock_scheduled_tasks())
+    populate.result.populateJobResult.populatedDEPRECATED = tasks
     return populate
 
   def test_successful_diff(self):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/commands/test_ssh.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_ssh.py b/src/test/python/apache/aurora/client/commands/test_ssh.py
index abb657b..cf9f425 100644
--- a/src/test/python/apache/aurora/client/commands/test_ssh.py
+++ b/src/test/python/apache/aurora/client/commands/test_ssh.py
@@ -49,8 +49,9 @@ class TestSshCommand(AuroraClientCommandTest):
   def create_mock_scheduled_tasks(cls):
     jobs = []
     for name in ['foo', 'bar', 'baz']:
+      job_key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
       job = Mock()
-      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
+      job.key = job_key
       job.failure_count = 0
       job.assignedTask = Mock(spec=AssignedTask)
       job.assignedTask.taskId = 1287391823
@@ -59,9 +60,10 @@ class TestSshCommand(AuroraClientCommandTest):
       job.assignedTask.task.executorConfig = Mock()
       job.assignedTask.task.maxTaskFailures = 1
       job.assignedTask.task.metadata = []
-      job.assignedTask.task.owner = Identity(role='mchucarroll')
-      job.assignedTask.task.environment = 'test'
-      job.assignedTask.task.jobName = 'woops'
+      job.assignedTask.task.job = job_key
+      job.assignedTask.task.owner = Identity(role=cls.TEST_ROLE)
+      job.assignedTask.task.environment = cls.TEST_ENV
+      job.assignedTask.task.jobName = name
       job.assignedTask.task.numCpus = 2
       job.assignedTask.task.ramMb = 2
       job.assignedTask.task.diskMb = 2

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/commands/test_status.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_status.py b/src/test/python/apache/aurora/client/commands/test_status.py
index 6397635..9eb8def 100644
--- a/src/test/python/apache/aurora/client/commands/test_status.py
+++ b/src/test/python/apache/aurora/client/commands/test_status.py
@@ -46,17 +46,19 @@ class TestListJobs(AuroraClientCommandTest):
   def create_mock_scheduled_tasks(cls):
     jobs = []
     for name in ['foo', 'bar', 'baz']:
+      job_key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
       job = Mock()
-      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
+      job.key = job_key
       job.failure_count = 0
       job.assignedTask = Mock(spec=AssignedTask)
       job.assignedTask.slaveHost = 'slavehost'
       job.assignedTask.task = Mock(spec=TaskConfig)
       job.assignedTask.task.maxTaskFailures = 1
       job.assignedTask.task.metadata = []
-      job.assignedTask.task.owner = Identity(role='mchucarroll')
-      job.assignedTask.task.environment = 'test'
-      job.assignedTask.task.jobName = 'woops'
+      job.assignedTask.task.job = job_key
+      job.assignedTask.task.owner = Identity(role=cls.TEST_ROLE)
+      job.assignedTask.task.environment = cls.TEST_ENV
+      job.assignedTask.task.jobName = name
       job.assignedTask.task.numCpus = 2
       job.assignedTask.task.ramMb = 2
       job.assignedTask.task.diskMb = 2

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/client/commands/test_update.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_update.py b/src/test/python/apache/aurora/client/commands/test_update.py
index 07cbe53..555ea0d 100644
--- a/src/test/python/apache/aurora/client/commands/test_update.py
+++ b/src/test/python/apache/aurora/client/commands/test_update.py
@@ -148,7 +148,12 @@ class TestUpdateCommand(AuroraClientCommandTest):
     api.populateJobConfig.return_value = populate
     configs = []
     for _ in range(20):
-      task_config = TaskConfig(numCpus=1.0, ramMb=1, diskMb=1)
+      task_config = TaskConfig(
+          numCpus=1.0,
+          ramMb=1,
+          diskMb=1,
+          job=JobKey(role='mchucarroll', environment='test', name='hello'))
+
       configs.append(task_config)
     populate.result.populateJobResult.populatedDEPRECATED = set(configs)
     return populate
@@ -176,7 +181,12 @@ class TestUpdateCommand(AuroraClientCommandTest):
     scheduler_proxy.getTasksWithoutConfigs.return_value = status_response
     schedule_status = Mock(spec=ScheduleStatusResult)
     status_response.result.scheduleStatusResult = schedule_status
-    task_config = TaskConfig(numCpus=1.0, ramMb=10, diskMb=1)
+    task_config = TaskConfig(
+        numCpus=1.0,
+        ramMb=10,
+        diskMb=1,
+        job=JobKey(role='mchucarroll', environment='test', name='hello'))
+
     # This should be a list of ScheduledTask's.
     schedule_status.tasks = []
     for i in range(20):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/config/test_thrift.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/config/test_thrift.py b/src/test/python/apache/aurora/config/test_thrift.py
index 1dd9e79..654c0b5 100644
--- a/src/test/python/apache/aurora/config/test_thrift.py
+++ b/src/test/python/apache/aurora/config/test_thrift.py
@@ -44,14 +44,16 @@ HELLO_WORLD = Job(
 
 def test_simple_config():
   job = convert_pystachio_to_thrift(HELLO_WORLD)
+  expected_key = JobKey(
+      role=HELLO_WORLD.role().get(),
+      environment=HELLO_WORLD.environment().get(),
+      name=HELLO_WORLD.name().get())
   assert job.instanceCount == 1
   tti = job.taskConfig
-  assert job.key == JobKey(
-    role=HELLO_WORLD.role().get(),
-    environment=HELLO_WORLD.environment().get(),
-    name=HELLO_WORLD.name().get())
+  assert job.key == expected_key
   assert job.owner == Identity(role=HELLO_WORLD.role().get(), user=getpass.getuser())
   assert job.cronSchedule is None
+  assert tti.job == expected_key
   assert tti.jobName == 'hello_world'
   assert tti.isService is False
   assert tti.numCpus == 0.1

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/executor/common/test_announcer.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_announcer.py b/src/test/python/apache/aurora/executor/common/test_announcer.py
index 5694335..a4ab532 100644
--- a/src/test/python/apache/aurora/executor/common/test_announcer.py
+++ b/src/test/python/apache/aurora/executor/common/test_announcer.py
@@ -188,11 +188,21 @@ def test_announcer_under_abnormal_circumstances():
 
 def make_assigned_task(thermos_config, assigned_ports=None):
   from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME
-  from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, Identity, TaskConfig
+  from gen.apache.aurora.api.ttypes import (
+      AssignedTask,
+      ExecutorConfig,
+      Identity,
+      JobKey,
+      TaskConfig
+  )
 
   assigned_ports = assigned_ports or {}
   executor_config = ExecutorConfig(name=AURORA_EXECUTOR_NAME, data=thermos_config.json_dumps())
   task_config = TaskConfig(
+      job=JobKey(
+          role=thermos_config.role().get(),
+          environment="prod",
+          name=thermos_config.name().get()),
       owner=Identity(role=thermos_config.role().get(), user=thermos_config.role().get()),
       environment=thermos_config.environment().get(),
       jobName=thermos_config.name().get(),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/06935c04/src/test/python/apache/aurora/executor/test_thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py
index 65e8cce..16a4011 100644
--- a/src/test/python/apache/aurora/executor/test_thermos_executor.py
+++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py
@@ -55,7 +55,7 @@ from apache.thermos.core.runner import TaskRunner
 from apache.thermos.monitoring.monitor import TaskMonitor
 
 from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME
-from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, Identity, TaskConfig
+from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, Identity, JobKey, TaskConfig
 
 if 'THERMOS_DEBUG' in os.environ:
   LogOptions.set_stderr_log_level('google:DEBUG')
@@ -137,6 +137,7 @@ def make_task(thermos_config, assigned_ports={}, **kw):
           executorConfig=ExecutorConfig(
               name=AURORA_EXECUTOR_NAME,
               data=thermos_config.json_dumps()),
+          job=JobKey(role=role, environment='env', name='name'),
           owner=Identity(role=role, user=role)),
       assignedPorts=assigned_ports,
       **kw)