You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2017/03/02 23:07:33 UTC

[1/2] aurora git commit: Enable Mesos HTTP API.

Repository: aurora
Updated Branches:
  refs/heads/master 2652fe02a -> 705dbc7cd


http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index c599fe3..b132cde 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -14,33 +14,14 @@
 package org.apache.aurora.scheduler.mesos;
 
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.net.InetAddresses;
 
-import org.apache.aurora.common.application.Lifecycle;
-import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.v1.Protos.ExecutorID;
 import org.apache.mesos.v1.Protos.FrameworkID;
 import org.apache.mesos.v1.Protos.OfferID;
 import org.apache.mesos.v1.Protos.TaskID;
@@ -48,64 +29,36 @@ import org.apache.mesos.v1.Protos.TaskState;
 import org.apache.mesos.v1.Protos.TaskStatus;
 import org.apache.mesos.v1.Protos.TaskStatus.Reason;
 import org.apache.mesos.v1.Protos.TaskStatus.Source;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.expect;
+import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class MesosSchedulerImplTest extends EasyMockTest {
 
   private static final String FRAMEWORK_ID = "framework-id";
   private static final FrameworkID FRAMEWORK =
       FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
-
+  private static final String MASTER_ID = "master-id";
+  private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
+      .setId(MASTER_ID)
+      .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+      .setPort(5050).build();
   private static final String SLAVE_HOST = "slave-hostname";
   private static final Protos.SlaveID SLAVE_ID =
       Protos.SlaveID.newBuilder().setValue("slave-id").build();
-  private static final String SLAVE_HOST_2 = "slave-hostname-2";
-  private static final Protos.SlaveID SLAVE_ID_2 =
-      Protos.SlaveID.newBuilder().setValue("slave-id-2").build();
-  private static final Protos.ExecutorID EXECUTOR_ID =
-      Protos.ExecutorID.newBuilder().setValue("executor-id").build();
 
   private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
   private static final Protos.Offer OFFER = Protos.Offer.newBuilder()
-          .setFrameworkId(ProtosConversion.convert(FRAMEWORK))
+          .setFrameworkId(convert(FRAMEWORK))
           .setSlaveId(SLAVE_ID)
           .setHostname(SLAVE_HOST)
-          .setId(ProtosConversion.convert(OFFER_ID))
-          .build();
-  private static final HostOffer HOST_OFFER = new HostOffer(
-      ProtosConversion.convert(OFFER),
-      IHostAttributes.build(
-          new HostAttributes()
-              .setHost(SLAVE_HOST)
-              .setSlaveId(SLAVE_ID.getValue())
-              .setMode(NONE)
-              .setAttributes(ImmutableSet.of())));
-  private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
-  private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder(OFFER)
-          .setSlaveId(SLAVE_ID_2)
-          .setHostname(SLAVE_HOST_2)
-          .setId(ProtosConversion.convert(OFFER_ID_2))
+          .setId(convert(OFFER_ID))
           .build();
-  private static final HostOffer HOST_OFFER_2 = new HostOffer(
-      ProtosConversion.convert(OFFER_2),
-      IHostAttributes.build(
-          new HostAttributes()
-              .setHost(SLAVE_HOST_2)
-              .setSlaveId(SLAVE_ID_2.getValue())
-              .setMode(NONE)
-              .setAttributes(ImmutableSet.of())));
+
+  private static final ExecutorID EXECUTOR_ID =
+      ExecutorID.newBuilder().setValue("executor-id").build();
 
   private static final TaskStatus STATUS_NO_REASON = TaskStatus.newBuilder()
       .setState(TaskState.TASK_RUNNING)
@@ -121,358 +74,129 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       .setReason(Reason.REASON_COMMAND_EXECUTOR_FAILED)
       .build();
 
-  private static final TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON
-      .toBuilder()
-      .setReason(Reason.REASON_RECONCILIATION)
-      .build();
-
-  private static final TaskStatusReceived PUBSUB_RECONCILIATION_EVENT = new TaskStatusReceived(
-      STATUS_RECONCILIATION.getState(),
-      Optional.of(STATUS_RECONCILIATION.getSource()),
-      Optional.of(STATUS_RECONCILIATION.getReason()),
-      Optional.of(1000000L)
-  );
-
-  private StorageTestUtil storageUtil;
-  private Command shutdownCommand;
-  private TaskStatusHandler statusHandler;
-  private OfferManager offerManager;
+  private MesosCallbackHandler handler;
   private SchedulerDriver driver;
-  private EventSink eventSink;
-  private FakeStatsProvider statsProvider;
 
   private MesosSchedulerImpl scheduler;
 
   @Before
   public void setUp() {
-    Logger log = LoggerFactory.getLogger("");
-    initializeScheduler(log);
-  }
-
-  private void initializeScheduler(Logger logger) {
-    storageUtil = new StorageTestUtil(this);
-    shutdownCommand = createMock(Command.class);
-    statusHandler = createMock(TaskStatusHandler.class);
-    offerManager = createMock(OfferManager.class);
-    eventSink = createMock(EventSink.class);
-    statsProvider = new FakeStatsProvider();
-
-    scheduler = new MesosSchedulerImpl(
-        storageUtil.storage,
-        new Lifecycle(shutdownCommand),
-        statusHandler,
-        offerManager,
-        eventSink,
-        MoreExecutors.sameThreadExecutor(),
-        new CachedCounters(statsProvider),
-        logger,
-        statsProvider);
+    handler = createMock(MesosCallbackHandler.class);
     driver = createMock(SchedulerDriver.class);
+    scheduler = new MesosSchedulerImpl(handler);
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void testBadOrdering() {
-    control.replay();
+  @Test
+  public void testSlaveLost() {
+    handler.handleLostAgent(convert(SLAVE_ID));
+    expectLastCall().once();
 
-    // Should fail since the scheduler is not yet registered.
-    scheduler.resourceOffers(driver, ImmutableList.of());
-  }
+    control.replay();
 
-  @Test
-  public void testNoOffers() {
-    new AbstractRegisteredTest() {
-      @Override
-      void test() {
-        scheduler.resourceOffers(driver, ImmutableList.of());
-      }
-    }.run();
+    scheduler.slaveLost(driver, SLAVE_ID);
   }
 
   @Test
-  public void testAcceptOffer() {
-    new AbstractOfferTest() {
-      @Override
-      void respondToOffer() {
-        expectOfferAttributesSaved(HOST_OFFER);
-        offerManager.addOffer(HOST_OFFER);
-      }
-    }.run();
-  }
+  public void testRegistered() {
+    handler.handleRegistration(FRAMEWORK, convert(MASTER));
+    expectLastCall().once();
 
-  @Test
-  public void testAcceptOfferDebugLogging() {
-    Logger mockLogger = createMock(Logger.class);
-    mockLogger.info(anyString());
-    mockLogger.debug(anyString(), EasyMock.<Object>anyObject());
-    initializeScheduler(mockLogger);
-
-    new AbstractOfferTest() {
-      @Override
-      void respondToOffer() {
-        expectOfferAttributesSaved(HOST_OFFER);
-        offerManager.addOffer(HOST_OFFER);
-      }
-    }.run();
-  }
+    control.replay();
 
-  @Test
-  public void testAttributesModePreserved() {
-    new AbstractOfferTest() {
-      @Override
-      void respondToOffer() {
-        IHostAttributes draining =
-            IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING));
-        expect(storageUtil.attributeStore.getHostAttributes(HOST_OFFER.getOffer().getHostname()))
-            .andReturn(Optional.of(draining));
-        IHostAttributes saved = IHostAttributes.build(
-            Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING));
-        expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
-
-        HostOffer offer = new HostOffer(HOST_OFFER.getOffer(), draining);
-        offerManager.addOffer(offer);
-      }
-    }.run();
+    scheduler.registered(driver, convert(FRAMEWORK), MASTER);
   }
 
   @Test
-  public void testStatusUpdate() {
-    // Test multiple variations of fields in TaskStatus to cover all branches.
-    new StatusUpdater(STATUS).run();
-    control.verify();
-    control.reset();
-    new StatusUpdater(STATUS.toBuilder().clearSource().build()).run();
-    control.verify();
-    control.reset();
-    new StatusUpdater(STATUS.toBuilder().clearReason().build()).run();
-    control.verify();
-    control.reset();
-    new StatusUpdater(STATUS.toBuilder().clearMessage().build()).run();
-  }
+  public void testDisconnected() {
+    handler.handleDisconnection();
+    expectLastCall().once();
 
-  @Test(expected = SchedulerException.class)
-  public void testStatusUpdateFails() {
-    new AbstractStatusTest() {
-      @Override
-      void expectations() {
-        eventSink.post(new TaskStatusReceived(
-            STATUS.getState(),
-            Optional.of(STATUS.getSource()),
-            Optional.of(STATUS.getReason()),
-            Optional.of(1000000L)
-        ));
-        statusHandler.statusUpdate(status);
-        expectLastCall().andThrow(new StorageException("Injected."));
-      }
-    }.run();
-  }
+    control.replay();
 
-  @Test
-  public void testMultipleOffers() {
-    new AbstractRegisteredTest() {
-      @Override
-      void expectations() {
-        expectOfferAttributesSaved(HOST_OFFER);
-        expectOfferAttributesSaved(HOST_OFFER_2);
-        offerManager.addOffer(HOST_OFFER);
-        offerManager.addOffer(HOST_OFFER_2);
-      }
-
-      @Override
-      void test() {
-        scheduler.resourceOffers(driver,
-            ImmutableList.of(
-                ProtosConversion.convert(HOST_OFFER.getOffer()),
-                ProtosConversion.convert(HOST_OFFER_2.getOffer())));
-      }
-    }.run();
+    scheduler.disconnected(driver);
   }
 
   @Test
-  public void testDisconnected() {
-    new AbstractRegisteredTest() {
-      @Override
-      void expectations() {
-        eventSink.post(new DriverDisconnected());
-      }
-
-      @Override
-      void test() {
-        scheduler.disconnected(driver);
-        assertEquals(1L, statsProvider.getLongValue("scheduler_framework_disconnects"));
-      }
-    }.run();
-  }
+  public void testReRegistered() {
+    handler.handleReregistration(convert(MASTER));
+    expectLastCall().once();
 
-  @Test
-  public void testFrameworkMessageIgnored() {
     control.replay();
 
-    scheduler.frameworkMessage(
-        driver,
-        EXECUTOR_ID,
-        SLAVE_ID,
-        "hello".getBytes(StandardCharsets.UTF_8));
+    scheduler.reregistered(driver, MASTER);
   }
 
   @Test
-  public void testSlaveLost() {
+  public void testResourceOffers() {
+    handler.handleRegistration(FRAMEWORK, convert(MASTER));
+    expectLastCall().once();
+    handler.handleOffers(ImmutableList.of(convert(OFFER)));
+    expectLastCall().once();
+
     control.replay();
 
-    scheduler.slaveLost(driver, SLAVE_ID);
-    assertEquals(1L, statsProvider.getLongValue("slaves_lost"));
+    scheduler.registered(driver, convert(FRAMEWORK), MASTER);
+    scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
   }
 
-  @Test
-  public void testReregistered() {
+  @Test(expected = IllegalStateException.class)
+  public void testBadOrdering() {
     control.replay();
 
-    scheduler.reregistered(driver, Protos.MasterInfo.getDefaultInstance());
+    // Should fail since the scheduler is not yet registered.
+    scheduler.resourceOffers(driver, ImmutableList.of());
   }
 
   @Test
   public void testOfferRescinded() {
-    offerManager.cancelOffer(OFFER_ID);
+    handler.handleRescind(OFFER_ID);
+    expectLastCall().once();
 
     control.replay();
 
-    scheduler.offerRescinded(driver, ProtosConversion.convert(OFFER_ID));
-    assertEquals(1L, statsProvider.getLongValue("offers_rescinded"));
+    scheduler.offerRescinded(driver, convert(OFFER_ID));
   }
 
   @Test
-  public void testError() {
-    shutdownCommand.execute();
+  public void testStatusUpdate() {
+    handler.handleUpdate(STATUS);
+    expectLastCall().once();
 
     control.replay();
 
-    scheduler.error(driver, "error");
+    scheduler.statusUpdate(driver, convert(STATUS));
   }
 
   @Test
-  public void testExecutorLost() {
+  public void testError() {
+    handler.handleError("Oh No!");
+    expectLastCall().once();
+
     control.replay();
 
-    scheduler.executorLost(driver, EXECUTOR_ID, SLAVE_ID, 1);
+    scheduler.error(driver, "Oh No!");
   }
 
   @Test
-  public void testStatusReconciliationAcceptsDebugLogging() {
-    Logger mockLogger = createMock(Logger.class);
-    mockLogger.info(anyString());
-    mockLogger.debug(anyString());
-    initializeScheduler(mockLogger);
-
-    new AbstractStatusReconciliationTest() {
-      @Override
-      void expectations() {
-        eventSink.post(PUBSUB_RECONCILIATION_EVENT);
-        statusHandler.statusUpdate(status);
-      }
-    }.run();
-  }
-
-  private class StatusUpdater extends AbstractStatusTest {
-    StatusUpdater(TaskStatus status) {
-      super(status);
-    }
-
-    @Override
-    void expectations() {
-      eventSink.post(new TaskStatusReceived(
-          status.getState(),
-          Optional.fromNullable(status.getSource()),
-          status.hasReason() ? Optional.of(status.getReason()) : Optional.absent(),
-          Optional.of(1000000L)
-      ));
-      statusHandler.statusUpdate(status);
-    }
-  }
-
-  private void expectOfferAttributesSaved(HostOffer offer) {
-    expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
-        .andReturn(Optional.absent());
-    IHostAttributes defaultMode = IHostAttributes.build(
-        Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE));
-    expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
-  }
-
-  private abstract class AbstractRegisteredTest {
-    private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
-    AbstractRegisteredTest() {
-      // Prevent otherwise silent noop tests that forget to call run().
-      addTearDown(new TearDown() {
-        @Override
-        public void tearDown() {
-          assertTrue(runCalled.get());
-        }
-      });
-    }
-
-    void run() {
-      runCalled.set(true);
-      eventSink.post(new DriverRegistered());
-      storageUtil.expectOperations();
-      storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
-      expectations();
-
-      control.replay();
-
-      scheduler.registered(
-          driver,
-          ProtosConversion.convert(FRAMEWORK),
-          Protos.MasterInfo.getDefaultInstance());
-      test();
-    }
-
-    void expectations() {
-      // Default no-op, subclasses may override.
-    }
-
-    abstract void test();
-  }
+  public void testFrameworkMessage() {
+    handler.handleMessage(EXECUTOR_ID, convert(SLAVE_ID));
+    expectLastCall().once();
 
-  private abstract class AbstractOfferTest extends AbstractRegisteredTest {
-    AbstractOfferTest() {
-      super();
-    }
-
-    abstract void respondToOffer();
-
-    @Override
-    void expectations() {
-      respondToOffer();
-    }
+    control.replay();
 
-    @Override
-    void test() {
-      scheduler.resourceOffers(
-          driver,
-          ImmutableList.of(ProtosConversion.convert(HOST_OFFER.getOffer())));
-    }
+    scheduler.frameworkMessage(
+        driver,
+        convert(EXECUTOR_ID),
+        SLAVE_ID,
+        "message".getBytes(StandardCharsets.UTF_8));
   }
 
-  private abstract class AbstractStatusTest extends AbstractRegisteredTest {
-    protected final TaskStatus status;
-
-    AbstractStatusTest() {
-      this(STATUS);
-    }
-
-    AbstractStatusTest(TaskStatus status) {
-      super();
-      this.status = status;
-    }
+  @Test
+  public void testExecutorLost() {
+    handler.handleLostExecutor(EXECUTOR_ID, convert(SLAVE_ID), 1);
 
-    @Override
-    void test() {
-      scheduler.statusUpdate(driver, ProtosConversion.convert(status));
-    }
-  }
+    control.replay();
 
-  private abstract class AbstractStatusReconciliationTest extends AbstractStatusTest {
-    AbstractStatusReconciliationTest() {
-      super(STATUS_RECONCILIATION);
-    }
+    scheduler.executorLost(driver, convert(EXECUTOR_ID), SLAVE_ID, 1);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
new file mode 100644
index 0000000..988ec50
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.InetAddresses;
+import com.google.protobuf.ByteString;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.ExecutorID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+import org.apache.mesos.v1.Protos.MasterInfo;
+import org.apache.mesos.v1.Protos.Offer;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskState;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.apache.mesos.v1.Protos.TaskStatus.Source;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VersionedMesosSchedulerImplTest extends EasyMockTest {
+
+  private MesosCallbackHandler handler;
+  private StorageTestUtil storageUtil;
+  private Mesos driver;
+  private FakeStatsProvider statsProvider;
+  private DriverSettings driverSettings;
+
+  private VersionedMesosSchedulerImpl scheduler;
+
+  private static final String AGENT_HOST = "slave-hostname";
+  private static final AgentID AGENT_ID =
+      AgentID.newBuilder().setValue("slave-id").build();
+
+  private static final String FRAMEWORK_ID = "framework-id";
+  private static final FrameworkID FRAMEWORK =
+      FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+  private static final FrameworkInfo FRAMEWORK_INFO = FrameworkInfo.newBuilder()
+      .setName("name")
+      .setUser("user")
+      .setCheckpoint(true)
+      .setFailoverTimeout(1000)
+      .build();
+
+  private static final String MASTER_ID = "master-id";
+  private static final MasterInfo MASTER = MasterInfo.newBuilder()
+      .setId(MASTER_ID)
+      .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+      .setPort(5050).build();
+
+  private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
+  private static final Offer OFFER = Offer.newBuilder()
+      .setFrameworkId(FRAMEWORK)
+      .setAgentId(AGENT_ID)
+      .setHostname(AGENT_HOST)
+      .setId(OFFER_ID)
+      .build();
+
+  private static final TaskStatus STATUS = TaskStatus.newBuilder()
+      .setState(TaskState.TASK_RUNNING)
+      .setSource(Source.SOURCE_AGENT)
+      .setMessage("message")
+      .setTimestamp(1D)
+      .setTaskId(TaskID.newBuilder().setValue("task-id").build())
+      .build();
+
+  private static final ExecutorID EXECUTOR_ID =
+      ExecutorID.newBuilder().setValue("executor-id").build();
+
+  private static final Event OFFER_EVENT = Event.newBuilder()
+      .setType(Event.Type.OFFERS)
+      .setOffers(Event.Offers.newBuilder()
+          .addOffers(OFFER))
+      .build();
+
+  private static final Event SUBSCRIBED_EVENT = Event.newBuilder()
+      .setType(Event.Type.SUBSCRIBED)
+      .setSubscribed(Event.Subscribed.newBuilder()
+          .setFrameworkId(FRAMEWORK)
+          .setHeartbeatIntervalSeconds(15)
+          .setMasterInfo(MASTER))
+      .build();
+
+  private static final Event UPDATE_EVENT = Event.newBuilder()
+      .setType(Event.Type.UPDATE)
+      .setUpdate(Event.Update.newBuilder()
+          .setStatus(STATUS))
+      .build();
+
+  private static final Event RESCIND_EVENT = Event.newBuilder()
+      .setType(Event.Type.RESCIND)
+      .setRescind(Event.Rescind.newBuilder().setOfferId(OFFER_ID))
+      .build();
+
+  private static final Event MESSAGE_EVENT = Event.newBuilder()
+      .setType(Event.Type.MESSAGE)
+      .setMessage(Event.Message.newBuilder()
+          .setAgentId(AGENT_ID)
+          .setExecutorId(EXECUTOR_ID)
+          .setData(ByteString.copyFromUtf8("message")))
+      .build();
+
+  private static final Event ERROR_EVENT = Event.newBuilder()
+      .setType(Event.Type.ERROR)
+      .setError(Event.Error.newBuilder().setMessage("Oh no!"))
+      .build();
+
+  private static final Event FAILED_AGENT_EVENT = Event.newBuilder()
+      .setType(Event.Type.FAILURE)
+      .setFailure(Event.Failure.newBuilder().setAgentId(AGENT_ID))
+      .build();
+
+  @Before
+  public void setUp() {
+    handler = createMock(MesosCallbackHandler.class);
+    storageUtil = new StorageTestUtil(this);
+    driver = createMock(Mesos.class);
+    statsProvider = new FakeStatsProvider();
+    driverSettings = createMock(DriverSettings.class);
+
+    scheduler = new VersionedMesosSchedulerImpl(
+        handler,
+        new CachedCounters(statsProvider),
+        storageUtil.storage,
+        driverSettings);
+  }
+
+  @Test
+  public void testConnected() {
+    // Once the V1 driver has connected, we need to establish a subscription to get events
+
+    storageUtil.expectOperations();
+    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+    expect(driverSettings.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+
+    Capture<Call> subscribeCapture = createCapture();
+
+    driver.send(capture(subscribeCapture));
+    expectLastCall().once();
+
+    control.replay();
+
+    scheduler.connected(driver);
+
+    assertTrue(subscribeCapture.hasCaptured());
+
+    Call subscribe = subscribeCapture.getValue();
+
+    assertEquals(subscribe.getType(), Call.Type.SUBSCRIBE);
+    assertEquals(subscribe.getFrameworkId(), FRAMEWORK);
+    assertEquals(
+        subscribe.getSubscribe().getFrameworkInfo(),
+        FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build());
+  }
+
+  @Test
+  public void testDisconnected() {
+    handler.handleDisconnection();
+
+    control.replay();
+
+    scheduler.disconnected(driver);
+  }
+
+  @Test
+  public void testSubscription() {
+    handler.handleRegistration(FRAMEWORK, MASTER);
+
+    control.replay();
+
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_SUBSCRIBED"));
+  }
+
+  @Test
+  public void testOffers() {
+    handler.handleRegistration(FRAMEWORK, MASTER);
+    handler.handleOffers(ImmutableList.of(OFFER));
+
+    control.replay();
+
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+    scheduler.received(driver, OFFER_EVENT);
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_OFFERS"));
+  }
+
+  @Test
+  public void testRescind() {
+    handler.handleRescind(OFFER_ID);
+
+    control.replay();
+
+    scheduler.received(driver, RESCIND_EVENT);
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_RESCIND"));
+  }
+
+  @Test
+  public void testUpdate() {
+    handler.handleUpdate(STATUS);
+
+    control.replay();
+
+    scheduler.received(driver, UPDATE_EVENT);
+
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_UPDATE"));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBadOrdering() {
+    // get an offer before the driver has registered
+
+    control.replay();
+
+    scheduler.received(driver, OFFER_EVENT);
+  }
+
+  @Test
+  public void testMessage() {
+    handler.handleMessage(EXECUTOR_ID, AGENT_ID);
+
+    control.replay();
+
+    scheduler.received(driver, MESSAGE_EVENT);
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_MESSAGE"));
+  }
+
+  @Test
+  public void testError() {
+    handler.handleError("Oh no!");
+
+    control.replay();
+
+    scheduler.received(driver, ERROR_EVENT);
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_ERROR"));
+  }
+
+  @Test
+  public void testFailedAgent() {
+    handler.handleLostAgent(AGENT_ID);
+
+    control.replay();
+
+    scheduler.received(driver, FAILED_AGENT_EVENT);
+    assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_FAILURE"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
new file mode 100644
index 0000000..72aede8
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VersionedSchedulerDriverServiceTest extends EasyMockTest {
+
+  private static final String FRAMEWORK_ID = "framework_id";
+  private static final Protos.OfferID OFFER_ID =
+      Protos.OfferID.newBuilder().setValue("offer-id").build();
+  private static final Protos.Filters FILTER =
+      Protos.Filters.newBuilder().setRefuseSeconds(10).build();
+  private static final DriverSettings SETTINGS = new DriverSettings(
+      "fakemaster",
+      Optional.absent(),
+      Protos.FrameworkInfo.newBuilder()
+          .setUser("framework user")
+          .setName("test framework")
+          .build());
+
+  private StorageTestUtil storage;
+  private Scheduler scheduler;
+  private VersionedSchedulerDriverService driverService;
+  private VersionedDriverFactory driverFactory;
+  private Mesos mesos;
+
+  @Before
+  public void setUp() {
+    scheduler = createMock(Scheduler.class);
+    storage = new StorageTestUtil(this);
+    driverFactory = createMock(VersionedDriverFactory.class);
+    mesos = createMock(Mesos.class);
+    driverService = new VersionedSchedulerDriverService(
+        storage.storage,
+        SETTINGS,
+        scheduler,
+        driverFactory);
+  }
+
+  @Test
+  public void testNoopStop() {
+    control.replay();
+
+    driverService.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void testStop() {
+    expectStart();
+    control.replay();
+
+    driverService.startAsync().awaitRunning();
+    driverService.stopAsync().awaitTerminated();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testExceptionBeforeStart() {
+    control.replay();
+    driverService.killTask("task-id");
+  }
+
+  @Test
+  public void testBlockingBeforeRegistered() throws InterruptedException {
+    expectStart();
+    control.replay();
+    driverService.startAsync().awaitRunning();
+
+    Thread killRunner = new Thread(() -> {
+      driverService.killTask("task-id");
+    });
+
+    killRunner.start();
+
+    // A hack to ensure the thread actually executes the method
+    Thread.sleep(1000L);
+    assertEquals(Thread.State.WAITING, killRunner.getState());
+
+    killRunner.interrupt();
+  }
+
+  @Test
+  public void testKill() {
+    expectStart();
+    expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> killCapture = createCapture();
+    mesos.send(capture(killCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.killTask("task-id");
+
+    assertTrue(killCapture.hasCaptured());
+    Call kill = killCapture.getValue();
+    assertEquals(kill.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(kill.getType(), Call.Type.KILL);
+    assertEquals(kill.getKill().getTaskId().getValue(), "task-id");
+  }
+
+  @Test
+  public void testDecline() {
+    expectStart();
+    expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> declineCapture = createCapture();
+    mesos.send(capture(declineCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.declineOffer(OFFER_ID, FILTER);
+
+    assertTrue(declineCapture.hasCaptured());
+    Call decline = declineCapture.getValue();
+    assertEquals(decline.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(decline.getType(), Call.Type.DECLINE);
+    assertEquals(decline.getDecline().getOfferIdsList(), ImmutableList.of(OFFER_ID));
+    assertEquals(decline.getDecline().getFilters(), FILTER);
+  }
+
+  @Test
+  public void testAccept() {
+    expectStart();
+    expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> acceptCapture = createCapture();
+    mesos.send(capture(acceptCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.acceptOffers(OFFER_ID, ImmutableList.of(), FILTER);
+
+    assertTrue(acceptCapture.hasCaptured());
+    Call accept = acceptCapture.getValue();
+    assertEquals(accept.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(accept.getType(), Call.Type.ACCEPT);
+    assertEquals(accept.getAccept().getFilters(), FILTER);
+    assertEquals(accept.getAccept().getOfferIdsList(), ImmutableList.of(OFFER_ID));
+    assertEquals(accept.getAccept().getOperationsList(), ImmutableList.of());
+  }
+
+  private void expectStart() {
+    storage.expectOperations();
+    expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Protos.FrameworkInfo.Builder builder = SETTINGS.getFrameworkInfo().toBuilder();
+    builder.setId(Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID));
+
+    expect(driverFactory.create(
+        scheduler,
+        builder.build(),
+        SETTINGS.getMasterUri(),
+        SETTINGS.getCredentials()
+    )).andReturn(mesos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index f2275c7..1421821 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -67,6 +67,7 @@ import org.apache.shiro.subject.Subject;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ResponseCode.OK;
+import static org.apache.aurora.scheduler.app.SchedulerMain.DriverKind.SCHEDULER_DRIVER;
 import static org.junit.Assert.assertEquals;
 
 public class ThriftIT extends EasyMockTest {
@@ -97,7 +98,7 @@ public class ThriftIT extends EasyMockTest {
             install(new TierModule(TaskTestUtil.TIER_CONFIG));
             bind(ExecutorSettings.class).toInstance(TestExecutorSettings.THERMOS_EXECUTOR);
 
-            install(new AppModule(configurationManagerSettings));
+            install(new AppModule(configurationManagerSettings, SCHEDULER_DRIVER));
 
             bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
 


[2/2] aurora git commit: Enable Mesos HTTP API.

Posted by zm...@apache.org.
Enable Mesos HTTP API.

This patch completes the design doc[1] and enables operators to choose between
two V1 Mesos API implementations. The first is `V0Mesos` which offers the V1 API
backed by the scheduler driver and the second is `V1Mesos` which offers the V1
API backed by a new HTTP API implementation.

There are three sets of changes in this patch.

First, the V1 Mesos code requires a Scheduler callback with a different API. To
maximize code reuse, event handling logic was extracted into a
`MesosCallbackHandler` class. `VersionedMesosSchedulerImpl` was created to
implement the new callback interface. Both callbacks new use the handler class
for logic.

Second, a new driver implementation using the new API was created. All of the
logic for the new driver is encapsulated in the
`VersionedSchedulerDriverService` class.

Third, some wiring changes were done to allow for Guice to do it's work and
allow for operators to select between the different driver implementations.

[1] https://docs.google.com/document/d/1bWK8ldaQSsRXvdKwTh8tyR_0qMxAlnMW70eOKoU3myo

Testing Done:
The e2e test has been run three times, each time with a different driver option.

Bugs closed: AURORA-1887, AURORA-1888

Reviewed at https://reviews.apache.org/r/57061/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/705dbc7c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/705dbc7c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/705dbc7c

Branch: refs/heads/master
Commit: 705dbc7cd7c3ff477bcf766cdafe49a68ab47dee
Parents: 2652fe0
Author: Zameer Manji <zm...@apache.org>
Authored: Thu Mar 2 15:07:11 2017 -0800
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Mar 2 15:07:11 2017 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   7 +
 examples/vagrant/upstart/aurora-scheduler.conf  |   5 +-
 .../aurora/benchmark/StatusUpdateBenchmark.java |   6 +-
 .../apache/aurora/scheduler/app/AppModule.java  |  12 +-
 .../aurora/scheduler/app/SchedulerMain.java     |  22 +-
 .../scheduler/mesos/LibMesosLoadingModule.java  |  29 +-
 .../scheduler/mesos/MesosCallbackHandler.java   | 288 +++++++++++++
 .../scheduler/mesos/MesosSchedulerImpl.java     | 212 +--------
 .../scheduler/mesos/ProtosConversion.java       |  28 ++
 .../scheduler/mesos/SchedulerDriverModule.java  |  50 ++-
 .../scheduler/mesos/VersionedDriverFactory.java |  32 ++
 .../mesos/VersionedMesosSchedulerImpl.java      | 198 +++++++++
 .../mesos/VersionedSchedulerDriverService.java  | 254 +++++++++++
 .../aurora/scheduler/app/SchedulerIT.java       |   7 +-
 .../mesos/MesosCallbackHandlerTest.java         | 430 +++++++++++++++++++
 .../scheduler/mesos/MesosSchedulerImplTest.java | 424 ++++--------------
 .../mesos/VersionedMesosSchedulerImplTest.java  | 275 ++++++++++++
 .../VersionedSchedulerDriverServiceTest.java    | 194 +++++++++
 .../aurora/scheduler/thrift/ThriftIT.java       |   3 +-
 19 files changed, 1907 insertions(+), 569 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 2391d32..1142f75 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -6,6 +6,13 @@
 - Add message parameter to `killTasks` RPC.
 - Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information.
 - Add support for per-task volume mounts for Mesos containers to the Aurora config DSL.
+* Added the `-mesos_driver` flag to the scheduler with three possible options:
+  `SCHEDULER_DRIVER`, `V0_MESOS`, `V1_MESOS`. The first uses the original driver
+  and the latter two use two new drivers from `libmesos`. `V0_MESOS` uses the
+  `SCHEDULER_DRIVER` under the hood and `V1_MESOS` uses a new HTTP API aware
+  driver. Users that want to use the HTTP API should use `V1_MESOS`.
+  Performance sensitive users should stick with the `SCHEDULER_DRIVER` or
+  `V0_MESOS` drivers.
 
 0.17.0
 ======

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf
index 49fdcbd..31fa036 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -16,7 +16,7 @@ respawn
 post-stop exec sleep 5
 
 # Environment variables control the behavior of the Mesos scheduler driver (libmesos).
-env GLOG_v=0
+env GLOG_v=1
 env LIBPROCESS_PORT=8083
 env LIBPROCESS_IP=192.168.33.7
 env DIST_DIR=/home/vagrant/aurora/dist
@@ -54,4 +54,5 @@ exec bin/aurora-scheduler \
   -enable_revocable_ram=true \
   -allow_gpu_resource=true \
   -allow_container_volumes=true \
-  -offer_filter_duration=0secs
+  -offer_filter_duration=0secs \
+  -mesos_driver=V1_DRIVER

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index 6c2bf46..95496c1 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -61,6 +61,8 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
+import org.apache.aurora.scheduler.mesos.MesosCallbackHandler;
+import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
 import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
 import org.apache.aurora.scheduler.mesos.ProtosConversion;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
@@ -183,8 +185,10 @@ public class StatusUpdateBenchmark {
             bind(Driver.class).toInstance(new FakeDriver());
             bind(Scheduler.class).to(MesosSchedulerImpl.class);
             bind(MesosSchedulerImpl.class).in(Singleton.class);
+            bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
+            bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
             bind(Executor.class)
-                .annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+                .annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
                 .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor(
                     "SchedulerImpl-%d",
                     LoggerFactory.getLogger(StatusUpdateBenchmark.class)));

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index e2ef9c3..081dff2 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -35,6 +35,7 @@ import org.apache.aurora.gen.Container;
 import org.apache.aurora.gen.Container._Fields;
 import org.apache.aurora.scheduler.SchedulerModule;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.app.SchedulerMain.DriverKind;
 import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
@@ -105,13 +106,15 @@ public class AppModule extends AbstractModule {
   private static final Arg<Boolean> ALLOW_CONTAINER_VOLUMES = Arg.create(false);
 
   private final ConfigurationManagerSettings configurationManagerSettings;
+  private final DriverKind kind;
 
   @VisibleForTesting
-  public AppModule(ConfigurationManagerSettings configurationManagerSettings) {
+  public AppModule(ConfigurationManagerSettings configurationManagerSettings, DriverKind kind) {
     this.configurationManagerSettings = requireNonNull(configurationManagerSettings);
+    this.kind = kind;
   }
 
-  public AppModule(boolean allowGpuResource) {
+  public AppModule(boolean allowGpuResource, DriverKind kind) {
     this(new ConfigurationManagerSettings(
         ImmutableSet.copyOf(ALLOWED_CONTAINER_TYPES.get()),
         ENABLE_DOCKER_PARAMETERS.get(),
@@ -119,7 +122,8 @@ public class AppModule extends AbstractModule {
         REQUIRE_DOCKER_USE_EXECUTOR.get(),
         allowGpuResource,
         ENABLE_MESOS_FETCHER.get(),
-        ALLOW_CONTAINER_VOLUMES.get()));
+        ALLOW_CONTAINER_VOLUMES.get()),
+        kind);
   }
 
   @Override
@@ -149,7 +153,7 @@ public class AppModule extends AbstractModule {
     install(new QuotaModule());
     install(new JettyServerModule());
     install(new PreemptorModule());
-    install(new SchedulerDriverModule());
+    install(new SchedulerDriverModule(kind));
     install(new SchedulerServicesModule());
     install(new SchedulerModule());
     install(new StateModule());

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 805e9de..3665c4d 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -93,6 +93,24 @@ public class SchedulerMain {
   @CmdLine(name = "allow_gpu_resource", help = "Allow jobs to request Mesos GPU resource.")
   private static final Arg<Boolean> ALLOW_GPU_RESOURCE = Arg.create(false);
 
+  public enum DriverKind {
+    // TODO(zmanji): Remove this option once V0_DRIVER has been proven out in production.
+    // This is the original driver that libmesos shipped with. Uses unversioned protobufs, and has
+    // minimal backwards compatability guarantees.
+    SCHEDULER_DRIVER,
+    // These are the new drivers that libmesos ships with. They use versioned (V1) protobufs for
+    // the Java API.
+    // V0 Driver offers the V1 API over the old Scheduler Driver. It does not fully support
+    // the V1 API (ie mesos maintenance).
+    V0_DRIVER,
+    // V1 Driver offers the V1 API over a full HTTP API implementation. It allows for maintenance
+    // primatives and other new features.
+    V1_DRIVER,
+  }
+
+  @CmdLine(name = "mesos_driver", help = "Which Mesos Driver to use")
+  private static final Arg<DriverKind> DRIVER_IMPL = Arg.create(DriverKind.SCHEDULER_DRIVER);
+
   @Inject private SingletonService schedulerService;
   @Inject private HttpService httpService;
   @Inject private SchedulerLifecycle schedulerLifecycle;
@@ -141,7 +159,7 @@ public class SchedulerMain {
     return Modules.combine(
         new LifecycleModule(),
         new StatsModule(),
-        new AppModule(ALLOW_GPU_RESOURCE.get()),
+        new AppModule(ALLOW_GPU_RESOURCE.get(), DRIVER_IMPL.get()),
         new CronModule(),
         new DbModule.MigrationManagerModule(),
         DbModule.productionModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)),
@@ -203,7 +221,7 @@ public class SchedulerMain {
     List<Module> modules = ImmutableList.<Module>builder()
         .add(
             new CommandLineDriverSettingsModule(ALLOW_GPU_RESOURCE.get()),
-            new LibMesosLoadingModule(),
+            new LibMesosLoadingModule(DRIVER_IMPL.get()),
             new MesosLogStreamModule(FlaggedZooKeeperConfig.create()),
             new LogStorageModule(),
             new TierModule(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
index e1a2359..3e943ff 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
@@ -15,12 +15,39 @@ package org.apache.aurora.scheduler.mesos;
 
 import com.google.inject.AbstractModule;
 
+import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.mesos.v1.scheduler.V0Mesos;
+import org.apache.mesos.v1.scheduler.V1Mesos;
+
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * A module that binds a driver factory which requires the libmesos native libary.
  */
 public class LibMesosLoadingModule extends AbstractModule {
+  private final SchedulerMain.DriverKind kind;
+
+  public LibMesosLoadingModule(SchedulerMain.DriverKind kind)  {
+    this.kind = kind;
+  }
+
   @Override
   protected void configure() {
-    bind(DriverFactory.class).to(DriverFactoryImpl.class);
+    switch(kind) {
+      case SCHEDULER_DRIVER:
+        bind(DriverFactory.class).to(DriverFactoryImpl.class);
+        break;
+      case V0_DRIVER:
+        bind(VersionedDriverFactory.class).toInstance((scheduler, frameworkInfo, master, creds)
+            -> new V0Mesos(scheduler, frameworkInfo, master, creds.orNull()));
+        break;
+      case V1_DRIVER:
+        bind(VersionedDriverFactory.class).toInstance((scheduler, frameworkInfo, master, creds)
+            -> new V1Mesos(scheduler, master, creds.orNull()));
+        break;
+      default:
+        checkState(false, "Unknown driver kind");
+        break;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
new file mode 100644
index 0000000..801551b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+
+import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.ExecutorID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.MasterInfo;
+import org.apache.mesos.v1.Protos.Offer;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.mesos.v1.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
+
+/**
+ * Abstracts the logic of handling scheduler events/callbacks from Mesos.
+ * This interface allows the two different Mesos Scheduler Callback classes
+ * to share logic and to simplify testing.
+ */
+public interface MesosCallbackHandler {
+  void handleRegistration(FrameworkID frameworkId, MasterInfo masterInfo);
+  void handleReregistration(MasterInfo masterInfo);
+  void handleOffers(List<Offer> offers);
+  void handleDisconnection();
+  void handleRescind(OfferID offerId);
+  void handleMessage(ExecutorID executor, AgentID agent);
+  void handleError(String message);
+  void handleUpdate(TaskStatus status);
+  void handleLostAgent(AgentID agentId);
+  void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status);
+
+  class MesosCallbackHandlerImpl implements MesosCallbackHandler {
+
+    private final TaskStatusHandler taskStatusHandler;
+    private final OfferManager offerManager;
+    private final Storage storage;
+    private final Lifecycle lifecycle;
+    private final EventSink eventSink;
+    private final Executor executor;
+    private final Logger log;
+
+    private final AtomicLong offersRescinded;
+    private final AtomicLong slavesLost;
+    private final AtomicLong reRegisters;
+    private final AtomicLong offersRecieved;
+    private final AtomicLong disconnects;
+    private final AtomicLong executorsLost;
+
+    /**
+     * Binding annotation for the executor the incoming Mesos message handler uses.
+     */
+    @VisibleForTesting
+    @Qualifier
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    public @interface SchedulerExecutor { }
+
+    /**
+     * Creates a new handler for callbacks.
+     *
+     * @param storage Store to save host attributes into.
+     * @param lifecycle Application lifecycle manager.
+     * @param taskStatusHandler Task status update manager.
+     * @param offerManager Offer manager.
+     * @param eventSink Pubsub sink to send driver status changes to.
+     * @param executor Executor for async work
+     */
+    @Inject
+    public MesosCallbackHandlerImpl(
+        Storage storage,
+        Lifecycle lifecycle,
+        TaskStatusHandler taskStatusHandler,
+        OfferManager offerManager,
+        EventSink eventSink,
+        @SchedulerExecutor Executor executor,
+        StatsProvider statsProvider) {
+
+      this(
+          storage,
+          lifecycle,
+          taskStatusHandler,
+          offerManager,
+          eventSink,
+          executor,
+          LoggerFactory.getLogger(MesosCallbackHandlerImpl.class),
+          statsProvider);
+    }
+
+    @VisibleForTesting
+    MesosCallbackHandlerImpl(
+        Storage storage,
+        Lifecycle lifecycle,
+        TaskStatusHandler taskStatusHandler,
+        OfferManager offerManager,
+        EventSink eventSink,
+        Executor executor,
+        Logger log,
+        StatsProvider statsProvider) {
+
+      this.storage = requireNonNull(storage);
+      this.lifecycle = requireNonNull(lifecycle);
+      this.taskStatusHandler = requireNonNull(taskStatusHandler);
+      this.offerManager = requireNonNull(offerManager);
+      this.eventSink = requireNonNull(eventSink);
+      this.executor = requireNonNull(executor);
+      this.log = requireNonNull(log);
+
+      this.offersRescinded = statsProvider.makeCounter("offers_rescinded");
+      this.slavesLost = statsProvider.makeCounter("slaves_lost");
+      this.reRegisters = statsProvider.makeCounter("scheduler_framework_reregisters");
+      this.offersRecieved = statsProvider.makeCounter("scheduler_resource_offers");
+      this.disconnects = statsProvider.makeCounter("scheduler_framework_disconnects");
+      this.executorsLost = statsProvider.makeCounter("scheduler_lost_executors");
+    }
+
+    @Override
+    public void handleRegistration(FrameworkID frameworkId, MasterInfo masterInfo) {
+      log.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
+
+      storage.write(
+          (Storage.MutateWork.NoResult.Quiet) storeProvider ->
+              storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue()));
+      eventSink.post(new PubsubEvent.DriverRegistered());
+    }
+
+    @Override
+    public void handleReregistration(MasterInfo masterInfo) {
+      log.info("Framework re-registered with master " + masterInfo);
+      reRegisters.incrementAndGet();
+    }
+
+    @Override
+    public void handleOffers(List<Offer> offers) {
+      // Don't invoke the executor or storage lock if the list of offers is empty.
+      if (offers.isEmpty()) {
+        return;
+      }
+
+      executor.execute(() -> {
+        // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+        //                offers when the host attributes cannot be found. (AURORA-137)
+        storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
+          for (Offer offer : offers) {
+            IHostAttributes attributes =
+                AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
+            storeProvider.getAttributeStore().saveHostAttributes(attributes);
+            log.debug("Received offer: {}", offer);
+            offersRecieved.incrementAndGet();
+            offerManager.addOffer(new HostOffer(offer, attributes));
+          }
+        });
+      });
+    }
+
+    @Override
+    public void handleDisconnection() {
+      log.warn("Framework disconnected.");
+      this.disconnects.incrementAndGet();
+      eventSink.post(new PubsubEvent.DriverDisconnected());
+    }
+
+    @Override
+    public void handleRescind(OfferID offerId) {
+      log.info("Offer rescinded: " + offerId);
+      offerManager.cancelOffer(offerId);
+      offersRescinded.incrementAndGet();
+    }
+
+    @Override
+    public void handleMessage(ExecutorID executorID, AgentID agentID) {
+      log.warn(
+          "Ignoring framework message from {} on {}.",
+          executorID.getValue(),
+          agentID.getValue());
+    }
+
+    @Override
+    public void handleError(String message) {
+      log.error("Received error message: " + message);
+      lifecycle.shutdown();
+    }
+
+    private static void logStatusUpdate(Logger logger, TaskStatus status) {
+      // Periodic task reconciliation runs generate a large amount of no-op messages.
+      // Suppress logging for reconciliation status updates by default.
+      boolean debugLevel = status.hasReason() && status.getReason() == REASON_RECONCILIATION;
+
+      StringBuilder message = new StringBuilder("Received status update for task ")
+          .append(status.getTaskId().getValue())
+          .append(" in state ")
+          .append(status.getState());
+      if (status.hasSource()) {
+        message.append(" from ").append(status.getSource());
+      }
+      if (status.hasReason()) {
+        message.append(" with ").append(status.getReason());
+      }
+      if (status.hasMessage()) {
+        message.append(": ").append(status.getMessage());
+      }
+      if (debugLevel) {
+        logger.debug(message.toString());
+      } else {
+        logger.info(message.toString());
+      }
+    }
+
+    private static final Function<Double, Long> SECONDS_TO_MICROS =
+        seconds -> (long) (seconds * 1E6);
+
+    @Override
+    public void handleUpdate(TaskStatus status) {
+      logStatusUpdate(log, status);
+      eventSink.post(new PubsubEvent.TaskStatusReceived(
+          status.getState(),
+          // Source and Reason are enums. They cannot be null so we we need to use `hasXXX`.
+          status.hasSource() ? Optional.of(status.getSource()) : Optional.absent(),
+          status.hasReason() ? Optional.of(status.getReason()) : Optional.absent(),
+          Optional.fromNullable(status.getTimestamp()).transform(SECONDS_TO_MICROS)));
+
+      try {
+        // The status handler is responsible for acknowledging the update.
+        taskStatusHandler.statusUpdate(status);
+      } catch (SchedulerException e) {
+        log.error("Status update failed due to scheduler exception: " + e, e);
+        // We re-throw the exception here to trigger an abort of the driver.
+        throw e;
+      }
+    }
+
+    @Override
+    public void handleLostAgent(AgentID agentId) {
+      log.info("Received notification of lost agent: " + agentId);
+      slavesLost.incrementAndGet();
+    }
+
+    @Override
+    public void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status) {
+      // With the current implementation of MESOS-313, Mesos is also reporting clean terminations of
+      // custom executors via the executorLost callback.
+      if (status != 0) {
+        log.warn("Lost executor " + executorID + " on slave " + slaveID + " with status " + status);
+        executorsLost.incrementAndGet();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index eb21096..c3a34d2 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -13,37 +13,14 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
 import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
 
 import org.apache.aurora.GuiceUtils.AllowUnchecked;
-import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.MasterInfo;
@@ -53,103 +30,29 @@ import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskStatus;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.mesos.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert;
 
 /**
- * Location for communication with mesos.
+ * Implementation of Scheduler callback interfaces for Mesos SchedulerDriver.
  */
 @VisibleForTesting
 public class MesosSchedulerImpl implements Scheduler {
-  private final TaskStatusHandler taskStatusHandler;
-  private final OfferManager offerManager;
-  private final Storage storage;
-  private final Lifecycle lifecycle;
-  private final EventSink eventSink;
-  private final Executor executor;
-  private final Logger log;
-  private final CachedCounters counters;
+  private final MesosCallbackHandler handler;
   private volatile boolean isRegistered = false;
-  private final AtomicLong offersRescinded;
-  private final AtomicLong slavesLost;
 
-  /**
-   * Binding annotation for the executor the incoming Mesos message handler uses.
-   */
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface SchedulerExecutor { }
-
-  /**
-   * Creates a new scheduler.
-   *
-   * @param storage Store to save host attributes into.
-   * @param lifecycle Application lifecycle manager.
-   * @param taskStatusHandler Task status update manager.
-   * @param offerManager Offer manager.
-   * @param eventSink Pubsub sink to send driver status changes to.
-   * @param executor Executor for async work
-   */
   @Inject
-  public MesosSchedulerImpl(
-      Storage storage,
-      Lifecycle lifecycle,
-      TaskStatusHandler taskStatusHandler,
-      OfferManager offerManager,
-      EventSink eventSink,
-      @SchedulerExecutor Executor executor,
-      CachedCounters counters,
-      StatsProvider statsProvider) {
-
-    this(
-        storage,
-        lifecycle,
-        taskStatusHandler,
-        offerManager,
-        eventSink,
-        executor,
-        counters,
-        LoggerFactory.getLogger(MesosSchedulerImpl.class),
-        statsProvider);
-  }
-
-  @VisibleForTesting
-  MesosSchedulerImpl(
-      Storage storage,
-      Lifecycle lifecycle,
-      TaskStatusHandler taskStatusHandler,
-      OfferManager offerManager,
-      EventSink eventSink,
-      Executor executor,
-      CachedCounters counters,
-      Logger log,
-      StatsProvider statsProvider) {
-
-    this.storage = requireNonNull(storage);
-    this.lifecycle = requireNonNull(lifecycle);
-    this.taskStatusHandler = requireNonNull(taskStatusHandler);
-    this.offerManager = requireNonNull(offerManager);
-    this.eventSink = requireNonNull(eventSink);
-    this.executor = requireNonNull(executor);
-    this.counters = requireNonNull(counters);
-    this.log = requireNonNull(log);
-    this.offersRescinded = statsProvider.makeCounter("offers_rescinded");
-    this.slavesLost = statsProvider.makeCounter("slaves_lost");
+  MesosSchedulerImpl(MesosCallbackHandler handler) {
+    this.handler = requireNonNull(handler);
   }
 
   @Override
   public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
-    log.info("Received notification of lost agent: " + slaveId);
-    slavesLost.incrementAndGet();
+    handler.handleLostAgent(convert(slaveId));
   }
 
   @Override
@@ -157,122 +60,48 @@ public class MesosSchedulerImpl implements Scheduler {
       SchedulerDriver driver,
       final FrameworkID frameworkId,
       MasterInfo masterInfo) {
-
-    log.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
-
-    storage.write(
-        (NoResult.Quiet) storeProvider ->
-            storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue()));
+    handler.handleRegistration(convert(frameworkId), convert(masterInfo));
     isRegistered = true;
-    eventSink.post(new DriverRegistered());
   }
 
   @Override
   public void disconnected(SchedulerDriver schedulerDriver) {
-    log.warn("Framework disconnected.");
-    counters.get("scheduler_framework_disconnects").incrementAndGet();
-    eventSink.post(new DriverDisconnected());
+    handler.handleDisconnection();
   }
 
   @Override
   public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
-    log.info("Framework re-registered with master " + masterInfo);
-    counters.get("scheduler_framework_reregisters").incrementAndGet();
+    handler.handleReregistration(convert(masterInfo));
   }
 
   @Timed("scheduler_resource_offers")
   @Override
   public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
-    Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
-
-    executor.execute(() -> {
-      // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
-      //                offers when the host attributes cannot be found. (AURORA-137)
-      storage.write((NoResult.Quiet) storeProvider -> {
-        for (Offer offer : offers) {
-          org.apache.mesos.v1.Protos.Offer o = ProtosConversion.convert(offer);
-          IHostAttributes attributes =
-              AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), o);
-          storeProvider.getAttributeStore().saveHostAttributes(attributes);
-          log.debug("Received offer: {}", offer);
-          counters.get("scheduler_resource_offers").incrementAndGet();
-          offerManager.addOffer(new HostOffer(o, attributes));
-        }
-      });
-    });
+    checkState(isRegistered, "Must be registered before receiving offers.");
+    handler.handleOffers(Lists.transform(offers, ProtosConversion::convert));
   }
 
   @Override
   public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
-    log.info("Offer rescinded: " + offerId);
-    offerManager.cancelOffer(ProtosConversion.convert(offerId));
-    offersRescinded.incrementAndGet();
-  }
-
-  private static void logStatusUpdate(Logger logger, TaskStatus status) {
-    // Periodic task reconciliation runs generate a large amount of no-op messages.
-    // Suppress logging for reconciliation status updates by default.
-    boolean debugLevel = status.hasReason() && status.getReason() == REASON_RECONCILIATION;
-
-    StringBuilder message = new StringBuilder("Received status update for task ")
-        .append(status.getTaskId().getValue())
-        .append(" in state ")
-        .append(status.getState());
-    if (status.hasSource()) {
-      message.append(" from ").append(status.getSource());
-    }
-    if (status.hasReason()) {
-      message.append(" with ").append(status.getReason());
-    }
-    if (status.hasMessage()) {
-      message.append(": ").append(status.getMessage());
-    }
-    if (debugLevel) {
-      logger.debug(message.toString());
-    } else {
-      logger.info(message.toString());
-    }
+    handler.handleRescind(convert(offerId));
   }
 
-  private static final Function<Double, Long> SECONDS_TO_MICROS = seconds -> (long) (seconds * 1E6);
-
   @AllowUnchecked
   @Timed("scheduler_status_update")
   @Override
   public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
-    logStatusUpdate(log, status);
-    org.apache.mesos.v1.Protos.TaskStatus converted = ProtosConversion.convert(status);
-    eventSink.post(new TaskStatusReceived(
-        converted.getState(),
-        Optional.fromNullable(converted.getSource()),
-        converted.hasReason() ? Optional.of(converted.getReason()) : Optional.absent(),
-        Optional.fromNullable(converted.getTimestamp()).transform(SECONDS_TO_MICROS)));
-
-    try {
-      // The status handler is responsible for acknowledging the update.
-      taskStatusHandler.statusUpdate(converted);
-    } catch (SchedulerException e) {
-      log.error("Status update failed due to scheduler exception: " + e, e);
-      // We re-throw the exception here to trigger an abort of the driver.
-      throw e;
-    }
+    handler.handleUpdate(convert(status));
   }
 
   @Override
   public void error(SchedulerDriver driver, String message) {
-    log.error("Received error message: " + message);
-    lifecycle.shutdown();
+    handler.handleError(message);
   }
 
   @Override
   public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
       int status) {
-    // With the current implementation of MESOS-313, Mesos is also reporting clean terminations of
-    // custom executors via the executorLost callback.
-    if (status != 0) {
-      log.warn("Lost executor " + executorID + " on slave " + slaveID + " with status " + status);
-      counters.get("scheduler_lost_executors").incrementAndGet();
-    }
+    handler.handleLostExecutor(convert(executorID), convert(slaveID), status);
   }
 
   @Timed("scheduler_framework_message")
@@ -282,7 +111,6 @@ public class MesosSchedulerImpl implements Scheduler {
       ExecutorID executorID,
       SlaveID slave,
       byte[] data) {
-
-    log.warn("Ignoring framework message.");
+    handler.handleMessage(convert(executorID), convert(slave));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java b/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
index bc9e23b..26112af 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
@@ -55,16 +55,36 @@ public final class ProtosConversion {
     return convert(id, Protos.OfferID.newBuilder());
   }
 
+  public static Protos.FrameworkID convert(org.apache.mesos.Protos.FrameworkID id) {
+    return convert(id, Protos.FrameworkID.newBuilder());
+  }
+
+  public static Protos.MasterInfo convert(org.apache.mesos.Protos.MasterInfo id) {
+    return convert(id, Protos.MasterInfo.newBuilder());
+  }
+
   public static Protos.TaskStatus convert(org.apache.mesos.Protos.TaskStatus s) {
     return convert(s, Protos.TaskStatus.newBuilder());
   }
 
+  public static Protos.AgentID convert(org.apache.mesos.Protos.SlaveID s) {
+    return convert(s, Protos.AgentID.newBuilder());
+  }
+
+  public static Protos.ExecutorID convert(org.apache.mesos.Protos.ExecutorID s) {
+    return convert(s, Protos.ExecutorID.newBuilder());
+  }
+
   // Methods to convert from V1 to unversioned.
 
   public static org.apache.mesos.Protos.FrameworkID convert(Protos.FrameworkID id) {
     return convert(id, org.apache.mesos.Protos.FrameworkID.newBuilder());
   }
 
+  public static org.apache.mesos.Protos.MasterInfo convert(Protos.MasterInfo id) {
+    return convert(id, org.apache.mesos.Protos.MasterInfo.newBuilder());
+  }
+
   public static org.apache.mesos.Protos.TaskID convert(Protos.TaskID id) {
     return convert(id, org.apache.mesos.Protos.TaskID.newBuilder());
   }
@@ -96,4 +116,12 @@ public final class ProtosConversion {
   public static org.apache.mesos.Protos.Offer convert(Protos.Offer f) {
     return convert(f, org.apache.mesos.Protos.Offer.newBuilder());
   }
+
+  public static org.apache.mesos.Protos.ExecutorID convert(Protos.ExecutorID f) {
+    return convert(f, org.apache.mesos.Protos.ExecutorID.newBuilder());
+  }
+
+  public static org.apache.mesos.Protos.SlaveID convert(Protos.AgentID f) {
+    return convert(f, org.apache.mesos.Protos.SlaveID.newBuilder());
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index 5519323..10d4f1b 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -18,38 +18,58 @@ import java.util.concurrent.Executor;
 import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
 
+import org.apache.aurora.scheduler.app.SchedulerMain;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
 import org.apache.mesos.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * A module that creates a {@link Driver} binding.
  */
 public class SchedulerDriverModule extends AbstractModule {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerDriverModule.class);
+  private final SchedulerMain.DriverKind kind;
+
+  public SchedulerDriverModule(SchedulerMain.DriverKind kind) {
+    this.kind = kind;
+  }
 
   @Override
   protected void configure() {
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
+    bind(Scheduler.class).to(MesosSchedulerImpl.class);
+    bind(org.apache.mesos.v1.scheduler.Scheduler.class).to(VersionedMesosSchedulerImpl.class);
+    bind(MesosSchedulerImpl.class).in(Singleton.class);
+    bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
+    bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
+    // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
+    bind(Executor.class).annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
+        .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
+
+    switch (kind) {
+      case SCHEDULER_DRIVER:
         bind(Driver.class).to(SchedulerDriverService.class);
         bind(SchedulerDriverService.class).in(Singleton.class);
-        expose(Driver.class);
-
-        bind(Scheduler.class).to(MesosSchedulerImpl.class);
-        bind(MesosSchedulerImpl.class).in(Singleton.class);
-        expose(Scheduler.class);
-
-        // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
-        bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
-            .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
-      }
-    });
+        break;
+      case V0_DRIVER:
+        bind(Driver.class).to(VersionedSchedulerDriverService.class);
+        bind(VersionedSchedulerDriverService.class).in(Singleton.class);
+        PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class);
+        break;
+      case V1_DRIVER:
+        bind(Driver.class).to(VersionedSchedulerDriverService.class);
+        bind(VersionedSchedulerDriverService.class).in(Singleton.class);
+        PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class);
+        break;
+      default:
+        checkState(false, "Unknown driver kind.");
+        break;
+    }
 
     PubsubEventModule.bindSubscriber(binder(), TaskStatusStats.class);
     bind(TaskStatusStats.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java
new file mode 100644
index 0000000..8afeec1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Scheduler;
+
+/**
+ * A layer over the constructor for {@link org.apache.mesos.v1.scheduler.Mesos}. This is needed
+ * since {@link org.apache.mesos.v1.scheduler.Mesos} implementations statically load libmesos.
+ */
+public interface VersionedDriverFactory {
+  Mesos create(
+      Scheduler scheduler,
+      Protos.FrameworkInfo frameworkInfo,
+      String master,
+      Optional<Protos.Credential> credentials);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
new file mode 100644
index 0000000..84e3f47
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.List;
+import javax.inject.Inject;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.inject.TimedInterceptor;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of Scheduler callback interfaces for the V1 Driver.
+ */
+public class VersionedMesosSchedulerImpl implements Scheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(VersionedMesosSchedulerImpl.class);
+
+  private final CachedCounters counters;
+  private final MesosCallbackHandler handler;
+  private final Storage storage;
+  private final DriverSettings settings;
+
+  private volatile boolean isRegistered = false;
+
+  private static final String EVENT_COUNTER_STAT_PREFIX = "mesos_scheduler_event_";
+  // A cache to hold the metric names to prevent us from creating strings for every event
+  private final LoadingCache<Event.Type, String> eventMetricNameCache = CacheBuilder.newBuilder()
+      .maximumSize(Event.Type.values().length)
+      .initialCapacity(Event.Type.values().length)
+      .build(new CacheLoader<Event.Type, String>() {
+        @Override
+        public String load(Event.Type key) throws Exception {
+          return EVENT_COUNTER_STAT_PREFIX + key.name();
+        }
+      });
+
+  @Inject
+  VersionedMesosSchedulerImpl(
+      MesosCallbackHandler handler,
+      CachedCounters counters,
+      Storage storage,
+      DriverSettings settings) {
+    this.handler = requireNonNull(handler);
+    this.counters = requireNonNull(counters);
+    this.storage = requireNonNull(storage);
+    this.settings = requireNonNull(settings);
+    initializeEventMetrics();
+  }
+
+  @Override
+  public void connected(Mesos mesos) {
+    LOG.info("Connected to Mesos master.");
+
+    Optional<String> frameworkId = storage.read(
+        storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
+
+    Protos.FrameworkInfo.Builder frameworkBuilder = settings.getFrameworkInfo().toBuilder();
+
+    Call.Builder call = Call.newBuilder().setType(Call.Type.SUBSCRIBE);
+
+    if (frameworkId.isPresent()) {
+      LOG.info("Found persisted framework ID: " + frameworkId);
+      Protos.FrameworkID id = Protos.FrameworkID.newBuilder().setValue(frameworkId.get()).build();
+      frameworkBuilder.setId(id);
+      call.setFrameworkId(id);
+    } else {
+      frameworkBuilder.clearId();
+      call.clearFrameworkId();
+      LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
+    }
+
+    LOG.info("Sending subscribe call");
+    mesos.send(call.setSubscribe(Call.Subscribe.newBuilder()
+        .setFrameworkInfo(frameworkBuilder.build())
+        .build())
+        .build());
+  }
+
+  @Override
+  public void disconnected(Mesos mesos) {
+    handler.handleDisconnection();
+  }
+
+  private void initializeEventMetrics() {
+    // For variable named metrics that are keyed on mesos enums, this ensures that we set
+    // all possible metrics to 0.
+    for (Event.Type type : Event.Type.values()) {
+      this.counters.get(eventMetricNameCache.getUnchecked(type));
+    }
+  }
+
+  private void countEventMetrics(Event event) {
+    this.counters.get(eventMetricNameCache.getUnchecked(event.getType())).incrementAndGet();
+  }
+
+  @TimedInterceptor.Timed("scheduler_received")
+  @Override
+  public void received(Mesos mesos, Event event) {
+    countEventMetrics(event);
+    switch(event.getType()) {
+      case SUBSCRIBED:
+        Event.Subscribed subscribed = event.getSubscribed();
+        if (isRegistered) {
+          handler.handleReregistration(subscribed.getMasterInfo());
+        } else {
+          handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo());
+          isRegistered = true;
+        }
+        break;
+
+      case OFFERS:
+        checkState(isRegistered, "Must be registered before receiving offers.");
+        handler.handleOffers(event.getOffers().getOffersList());
+        break;
+
+      case RESCIND:
+        handler.handleRescind(event.getRescind().getOfferId());
+        break;
+
+      case INVERSE_OFFERS:
+        List<Protos.InverseOffer> offers = event.getInverseOffers().getInverseOffersList();
+        String ids = Joiner.on(",").join(
+            Lists.transform(offers, input -> input.getId().getValue()));
+        LOG.warn("Ignoring inverse offers: {}", ids);
+        break;
+
+      case RESCIND_INVERSE_OFFER:
+        Protos.OfferID id = event.getRescindInverseOffer().getInverseOfferId();
+        LOG.warn("Ignoring rescinded inverse offer: {}", id);
+        break;
+
+      case UPDATE:
+        Protos.TaskStatus status = event.getUpdate().getStatus();
+        handler.handleUpdate(status);
+        break;
+
+      case MESSAGE:
+        Event.Message m = event.getMessage();
+        handler.handleMessage(m.getExecutorId(), m.getAgentId());
+        break;
+
+      case ERROR:
+        handler.handleError(event.getError().getMessage());
+        break;
+
+      case FAILURE:
+        Event.Failure failure = event.getFailure();
+        if (failure.hasExecutorId()) {
+          handler.handleLostExecutor(
+              failure.getExecutorId(),
+              failure.getAgentId(),
+              failure.getStatus());
+        } else {
+          handler.handleLostAgent(failure.getAgentId());
+        }
+        break;
+
+      // TODO(zmanji): handle HEARTBEAT in a graceful manner
+      // For now it is ok to silently ignore heart beats because the driver wil
+      // detect disconnections for us.
+      case HEARTBEAT:
+        break;
+
+      default:
+        LOG.warn("Unknown event from Mesos \n{}", event);
+        break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
new file mode 100644
index 0000000..9f39aeb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Collections2;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.v1.Protos.Credential;
+import org.apache.mesos.v1.Protos.Filters;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+import org.apache.mesos.v1.Protos.Offer.Operation;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A driver implementation that uses the V1 API drivers from libmesos.
+ */
+class VersionedSchedulerDriverService extends AbstractIdleService
+    implements Driver, EventSubscriber {
+  private static final Logger LOG = LoggerFactory.getLogger(VersionedSchedulerDriverService.class);
+
+  private final Storage storage;
+  private final DriverSettings driverSettings;
+  private final Scheduler scheduler;
+  private final VersionedDriverFactory factory;
+  private final SettableFuture<Mesos> mesosFuture = SettableFuture.create();
+  private final CountDownLatch terminationLatch = new CountDownLatch(1);
+  private final CountDownLatch registrationLatch = new CountDownLatch(1);
+
+  @Inject
+  VersionedSchedulerDriverService(
+      Storage storage,
+      DriverSettings settings,
+      Scheduler scheduler,
+      VersionedDriverFactory factory) {
+    this.storage = requireNonNull(storage);
+    this.driverSettings = requireNonNull(settings);
+    this.scheduler = requireNonNull(scheduler);
+    this.factory = requireNonNull(factory);
+  }
+
+  private FrameworkID getFrameworkId() {
+    String id = storage.read(storeProvider ->
+        storeProvider.getSchedulerStore().fetchFrameworkId().get());
+    return FrameworkID.newBuilder().setValue(id).build();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Optional<String> frameworkId = storage.read(
+        storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
+
+    LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri());
+    if (!driverSettings.getCredentials().isPresent()) {
+      LOG.warn("Connecting to master without authentication!");
+    }
+
+    FrameworkInfo.Builder frameworkBuilder = driverSettings.getFrameworkInfo().toBuilder();
+
+    if (frameworkId.isPresent()) {
+      LOG.info("Found persisted framework ID: " + frameworkId);
+      frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId.get()));
+    } else {
+      LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
+    }
+
+    Credential credential = driverSettings.getCredentials().orNull();
+    Mesos mesos = factory.create(
+        scheduler,
+        frameworkBuilder.build(),
+        driverSettings.getMasterUri(),
+        Optional.fromNullable(credential));
+
+    mesosFuture.set(mesos);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    terminationLatch.countDown();
+  }
+
+  @Override
+  public void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter) {
+    whenRegistered(() -> {
+      LOG.info("Accepting offer {} with ops {}", offerId, operations);
+
+      Futures.getUnchecked(mesosFuture).send(
+          Call.newBuilder()
+              .setFrameworkId(getFrameworkId())
+              .setType(Call.Type.ACCEPT)
+              .setAccept(
+                  Call.Accept.newBuilder()
+                      .addOfferIds(offerId)
+                      .addAllOperations(operations)
+                      .setFilters(filter))
+              .build());
+    });
+
+  }
+
+  @Override
+  public void declineOffer(OfferID offerId, Filters filter) {
+    whenRegistered(() -> {
+      LOG.info("Declining offer {}", offerId.getValue());
+
+      Futures.getUnchecked(mesosFuture).send(
+          Call.newBuilder().setType(Call.Type.DECLINE)
+              .setFrameworkId(getFrameworkId())
+              .setDecline(
+                  Call.Decline.newBuilder()
+                      .setFilters(filter)
+                      .addOfferIds(offerId))
+              .build()
+      );
+    });
+  }
+
+  @Override
+  public void killTask(String taskId) {
+    whenRegistered(() -> {
+      LOG.info("Killing task {}", taskId);
+
+      Futures.getUnchecked(mesosFuture).send(
+          Call.newBuilder().setType(Call.Type.KILL)
+              .setFrameworkId(getFrameworkId())
+              .setKill(
+                  Call.Kill.newBuilder()
+                      .setTaskId(TaskID.newBuilder().setValue(taskId)))
+              .build()
+      );
+
+    });
+  }
+
+  @Override
+  public void acknowledgeStatusUpdate(TaskStatus status) {
+    // The Mesos API says frameworks are only supposed to acknowledge status updates
+    // with a UUID. The V0Driver accepts them just fine but the V1Driver logs every time
+    // a status update is given without a uuid. To silence logs, we drop them here.
+
+    whenRegistered(() -> {
+      if (!status.hasUuid()) {
+        return;
+      }
+
+      LOG.info("Acking status update for {} with uuid: {}",
+          status.getTaskId().getValue(),
+          status.getUuid());
+
+      Futures.getUnchecked(mesosFuture).send(
+          Call.newBuilder().setType(Call.Type.ACKNOWLEDGE)
+              .setFrameworkId(getFrameworkId())
+              .setAcknowledge(
+                  Call.Acknowledge.newBuilder()
+                      .setAgentId(status.getAgentId())
+                      .setTaskId(status.getTaskId())
+                      .setUuid(status.getUuid()))
+              .build()
+      );
+    });
+
+  }
+
+  @Override
+  public void reconcileTasks(Collection<TaskStatus> statuses) {
+    whenRegistered(() -> {
+      Collection<Call.Reconcile.Task> tasks = Collections2.transform(statuses, taskStatus ->
+          Call.Reconcile.Task.newBuilder()
+              .setTaskId(taskStatus.getTaskId())
+              .build());
+
+      Futures.getUnchecked(mesosFuture).send(
+          Call.newBuilder()
+              .setType(Call.Type.RECONCILE)
+              .setFrameworkId(getFrameworkId())
+              .setReconcile(
+                  Call.Reconcile.newBuilder()
+                      .addAllTasks(tasks))
+              .build()
+      );
+    });
+  }
+
+  @Override
+  public void blockUntilStopped() {
+    ensureRunning();
+    try {
+      terminationLatch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void abort() {
+    terminationLatch.countDown();
+    stopAsync();
+  }
+
+  @Subscribe
+  public void registered(DriverRegistered event) {
+    registrationLatch.countDown();
+  }
+
+  private void whenRegistered(Command c) {
+    ensureRunning();
+    // We need to block until registered because thats when we are guaranteed to have our
+    // framework id. Without it, we cannot construct any Call objects.
+    try {
+      registrationLatch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    c.execute();
+  }
+
+  private void ensureRunning() {
+    checkState(isRunning(), "Driver is not running.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 0551804..f579975 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.hash.Hashing;
+import com.google.common.net.InetAddresses;
 import com.google.common.util.concurrent.Atomics;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -116,6 +117,10 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
   private static final String SERVERSET_PATH = "/fake/service/path";
   private static final String STATS_URL_PREFIX = "fake_url";
   private static final String FRAMEWORK_ID = "integration_test_framework_id";
+  private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
+      .setId("master-id")
+      .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+      .setPort(5050).build();
   private static final IHostAttributes HOST_ATTRIBUTES = IHostAttributes.build(new HostAttributes()
       .setHost("host")
       .setSlaveId("slave-id")
@@ -336,7 +341,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     scheduler.getValue().registered(
         driver,
         Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
-        Protos.MasterInfo.getDefaultInstance());
+        MASTER);
 
     awaitSchedulerReady();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
new file mode 100644
index 0000000..80f631e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class MesosCallbackHandlerTest extends EasyMockTest {
+  private static final Protos.AgentID AGENT_ID =
+      Protos.AgentID.newBuilder().setValue("agent-id").build();
+
+  private static final String MASTER_ID = "master-id";
+  private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
+      .setId(MASTER_ID)
+      .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+      .setPort(5050).build();
+
+  private static final Protos.ExecutorID EXECUTOR_ID =
+      Protos.ExecutorID.newBuilder().setValue("executor-id").build();
+
+  private static final String FRAMEWORK_ID = "framework-id";
+  private static final Protos.FrameworkID FRAMEWORK =
+      Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+
+  private static final String AGENT_HOST = "agent-hostname";
+
+  private static final Protos.OfferID OFFER_ID =
+      Protos.OfferID.newBuilder().setValue("offer-id").build();
+  private static final Protos.Offer OFFER = Protos.Offer.newBuilder()
+      .setFrameworkId(FRAMEWORK)
+      .setAgentId(AGENT_ID)
+      .setHostname(AGENT_HOST)
+      .setId(OFFER_ID)
+      .build();
+
+  private static final HostOffer HOST_OFFER = new HostOffer(
+      OFFER,
+      IHostAttributes.build(
+          new HostAttributes()
+              .setHost(AGENT_HOST)
+              .setSlaveId(AGENT_ID.getValue())
+              .setMode(NONE)
+              .setAttributes(ImmutableSet.of())));
+
+  private static final Protos.AgentID AGENT_ID_2 =
+      Protos.AgentID.newBuilder().setValue("agent-id2").build();
+  private static final String AGENT_HOST_2 = "agent2-hostname";
+  private static final Protos.OfferID OFFER_ID_2 =
+      Protos.OfferID.newBuilder().setValue("offer-id2").build();
+  private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder()
+      .setFrameworkId(FRAMEWORK)
+      .setAgentId(AGENT_ID_2)
+      .setHostname(AGENT_HOST_2)
+      .setId(OFFER_ID_2)
+      .build();
+
+  private static final HostOffer HOST_OFFER_2 = new HostOffer(
+      OFFER_2,
+      IHostAttributes.build(
+          new HostAttributes()
+              .setHost(AGENT_HOST_2)
+              .setSlaveId(AGENT_ID_2.getValue())
+              .setMode(NONE)
+              .setAttributes(ImmutableSet.of())));
+
+  private static final HostOffer DRAINING_HOST_OFFER = new HostOffer(
+      OFFER,
+      IHostAttributes.build(new HostAttributes()
+          .setHost(AGENT_HOST)
+          .setSlaveId(AGENT_ID.getValue())
+          .setMode(DRAINING)
+          .setAttributes(ImmutableSet.of())));
+
+  private static final Protos.TaskStatus STATUS_NO_REASON = Protos.TaskStatus.newBuilder()
+      .setState(Protos.TaskState.TASK_RUNNING)
+      .setSource(Protos.TaskStatus.Source.SOURCE_AGENT)
+      .setMessage("message")
+      .setTimestamp(1D)
+      .setTaskId(Protos.TaskID.newBuilder().setValue("task-id").build())
+      .build();
+
+  private static final Protos.TaskStatus STATUS = STATUS_NO_REASON
+      .toBuilder()
+      // Only testing data plumbing, this field with TASK_RUNNING would not normally happen,
+      .setReason(Protos.TaskStatus.Reason.REASON_COMMAND_EXECUTOR_FAILED)
+      .build();
+
+  private static final Protos.TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON
+      .toBuilder()
+      .setReason(Protos.TaskStatus.Reason.REASON_RECONCILIATION)
+      .build();
+
+  private StorageTestUtil storageUtil;
+  private Command shutdownCommand;
+  private TaskStatusHandler statusHandler;
+  private OfferManager offerManager;
+  private EventSink eventSink;
+  private FakeStatsProvider statsProvider;
+  private Logger injectedLog;
+
+  private MesosCallbackHandler handler;
+
+  @Before
+  public void setUp() {
+
+    storageUtil = new StorageTestUtil(this);
+    shutdownCommand = createMock(Command.class);
+    statusHandler = createMock(TaskStatusHandler.class);
+    offerManager = createMock(OfferManager.class);
+    eventSink = createMock(EventSink.class);
+    statsProvider = new FakeStatsProvider();
+
+    createHandler(false);
+  }
+
+  private void createHandler(boolean mockLogger) {
+    if (mockLogger) {
+      injectedLog = createMock(Logger.class);
+    } else {
+      injectedLog = LoggerFactory.getLogger("MesosCallbackHandlerTestLogger");
+    }
+
+    handler = new MesosCallbackHandler.MesosCallbackHandlerImpl(
+        storageUtil.storage,
+        new Lifecycle(shutdownCommand), // Cannot mock lifecycle
+        statusHandler,
+        offerManager,
+        eventSink,
+        MoreExecutors.directExecutor(),
+        injectedLog,
+        statsProvider);
+
+  }
+
+  @Test
+  public void testRegistration() {
+
+    storageUtil.expectOperations();
+
+    storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
+    expectLastCall();
+
+    eventSink.post(new PubsubEvent.DriverRegistered());
+
+    control.replay();
+
+    handler.handleRegistration(FRAMEWORK, MASTER);
+  }
+
+  @Test
+  public void testReRegistration() {
+    control.replay();
+
+    handler.handleReregistration(MASTER);
+    assertEquals(1L, statsProvider.getLongValue("scheduler_framework_reregisters"));
+  }
+
+  @Test
+  public void testGetEmptyOfferList() {
+    control.replay();
+
+    handler.handleOffers(ImmutableList.of());
+  }
+
+  private void expectOfferAttributesSaved(HostOffer offer) {
+    expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
+        .andReturn(Optional.absent());
+    IHostAttributes defaultMode = IHostAttributes.build(
+        Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE));
+    expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
+  }
+
+  @Test
+  public void testOffers() {
+    storageUtil.expectOperations();
+    expectOfferAttributesSaved(HOST_OFFER);
+    offerManager.addOffer(HOST_OFFER);
+
+    control.replay();
+
+    handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
+    assertEquals(1L, statsProvider.getLongValue("scheduler_resource_offers"));
+  }
+
+  @Test
+  public void testMultipleOffers() {
+    storageUtil.expectOperations();
+    expectOfferAttributesSaved(HOST_OFFER);
+    expectOfferAttributesSaved(HOST_OFFER_2);
+    offerManager.addOffer(HOST_OFFER);
+    offerManager.addOffer(HOST_OFFER_2);
+
+    control.replay();
+
+    handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer(), HOST_OFFER_2.getOffer()));
+    assertEquals(2L, statsProvider.getLongValue("scheduler_resource_offers"));
+  }
+
+  @Test
+  public void testModePreservedWhenOfferAdded() {
+    storageUtil.expectOperations();
+
+    IHostAttributes draining =
+        IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING));
+    expect(storageUtil.attributeStore.getHostAttributes(AGENT_HOST))
+        .andReturn(Optional.of(draining));
+
+    IHostAttributes saved = IHostAttributes.build(
+        Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING));
+
+    expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
+
+    // If the host is in draining, then the offer manager should get an offer with that attribute
+    offerManager.addOffer(DRAINING_HOST_OFFER);
+
+    control.replay();
+    handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
+    assertEquals(1L, statsProvider.getLongValue("scheduler_resource_offers"));
+
+  }
+
+  @Test
+  public void testDisconnection() {
+    eventSink.post(new PubsubEvent.DriverDisconnected());
+
+    control.replay();
+
+    handler.handleDisconnection();
+    assertEquals(1L, statsProvider.getLongValue("scheduler_framework_disconnects"));
+  }
+
+  @Test
+  public void testRescind() {
+    offerManager.cancelOffer(OFFER_ID);
+
+    control.replay();
+
+    handler.handleRescind(OFFER_ID);
+    assertEquals(1L, statsProvider.getLongValue("offers_rescinded"));
+  }
+
+  @Test
+  public void testError() {
+    shutdownCommand.execute();
+    expectLastCall();
+
+    control.replay();
+
+    handler.handleError("Something bad happened!");
+  }
+
+  @Test
+  public void testUpdate() {
+    eventSink.post(new PubsubEvent.TaskStatusReceived(
+        STATUS.getState(),
+        Optional.fromNullable(STATUS.getSource()),
+        Optional.fromNullable(STATUS.getReason()),
+        Optional.of(1000000L)
+    ));
+    statusHandler.statusUpdate(STATUS);
+
+    control.replay();
+
+    handler.handleUpdate(STATUS);
+  }
+
+  @Test
+  public void testUpdateNoSource() {
+    Protos.TaskStatus status = STATUS.toBuilder().clearSource().build();
+
+    eventSink.post(new PubsubEvent.TaskStatusReceived(
+        status.getState(),
+        Optional.absent(),
+        Optional.fromNullable(status.getReason()),
+        Optional.of(1000000L)
+    ));
+    statusHandler.statusUpdate(status);
+
+    control.replay();
+
+    handler.handleUpdate(status);
+  }
+
+  @Test
+  public void testUpdateNoReason() {
+    Protos.TaskStatus status = STATUS.toBuilder().clearReason().build();
+
+    eventSink.post(new PubsubEvent.TaskStatusReceived(
+        status.getState(),
+        Optional.fromNullable(status.getSource()),
+        Optional.absent(),
+        Optional.of(1000000L)
+    ));
+    statusHandler.statusUpdate(status);
+
+    control.replay();
+
+    handler.handleUpdate(status);
+  }
+
+  @Test
+  public void testUpdateNoMessage() {
+    Protos.TaskStatus status = STATUS.toBuilder().clearMessage().build();
+
+    eventSink.post(new PubsubEvent.TaskStatusReceived(
+        status.getState(),
+        Optional.fromNullable(status.getSource()),
+        Optional.fromNullable(status.getReason()),
+        Optional.of(1000000L)
+    ));
+    statusHandler.statusUpdate(status);
+
+    control.replay();
+
+    handler.handleUpdate(status);
+  }
+
+  @Test(expected = SchedulerException.class)
+  public void testUpdateWithException() {
+    eventSink.post(new PubsubEvent.TaskStatusReceived(
+        STATUS.getState(),
+        Optional.fromNullable(STATUS.getSource()),
+        Optional.fromNullable(STATUS.getReason()),
+        Optional.of(1000000L)
+    ));
+    statusHandler.statusUpdate(STATUS);
+    expectLastCall().andThrow(new Storage.StorageException("Storage Failure"));
+
+    control.replay();
+
+    handler.handleUpdate(STATUS);
+  }
+
+  @Test
+  public void testReconciliationUpdateLogging() {
+    // Mock the logger so we can test that it is logged at debug
+    createHandler(true);
+    String expectedMsg = "Received status update for task task-id in state TASK_RUNNING from "
+        + "SOURCE_AGENT with REASON_RECONCILIATION: message";
+
+    injectedLog.debug(expectedMsg);
+    expectLastCall().once();
+
+    eventSink.post(new PubsubEvent.TaskStatusReceived(
+        STATUS_RECONCILIATION.getState(),
+        Optional.fromNullable(STATUS_RECONCILIATION.getSource()),
+        Optional.fromNullable(STATUS_RECONCILIATION.getReason()),
+        Optional.of(1000000L)
+    ));
+
+    statusHandler.statusUpdate(STATUS_RECONCILIATION);
+
+    control.replay();
+
+    handler.handleUpdate(STATUS_RECONCILIATION);
+  }
+
+  @Test
+  public void testLostAgent() {
+    control.replay();
+
+    handler.handleLostAgent(AGENT_ID);
+    assertEquals(1L, statsProvider.getLongValue("slaves_lost"));
+  }
+
+  @Test
+  public void testLostExecutorIgnoresOkStatus() {
+    control.replay();
+
+    handler.handleLostExecutor(EXECUTOR_ID, AGENT_ID, 0);
+    assertEquals(0L, statsProvider.getLongValue("scheduler_lost_executors"));
+  }
+
+  @Test
+  public void testLostExecutor() {
+    control.replay();
+
+    handler.handleLostExecutor(EXECUTOR_ID, AGENT_ID, 1);
+    assertEquals(1L, statsProvider.getLongValue("scheduler_lost_executors"));
+  }
+
+  @Test
+  public void testMessage() {
+    // Framework messages should be ignored.
+    control.replay();
+
+    handler.handleMessage(EXECUTOR_ID, AGENT_ID);
+  }
+}