You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2017/03/02 23:07:33 UTC
[1/2] aurora git commit: Enable Mesos HTTP API.
Repository: aurora
Updated Branches:
refs/heads/master 2652fe02a -> 705dbc7cd
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index c599fe3..b132cde 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -14,33 +14,14 @@
package org.apache.aurora.scheduler.mesos;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.net.InetAddresses;
-import org.apache.aurora.common.application.Lifecycle;
-import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.v1.Protos.ExecutorID;
import org.apache.mesos.v1.Protos.FrameworkID;
import org.apache.mesos.v1.Protos.OfferID;
import org.apache.mesos.v1.Protos.TaskID;
@@ -48,64 +29,36 @@ import org.apache.mesos.v1.Protos.TaskState;
import org.apache.mesos.v1.Protos.TaskStatus;
import org.apache.mesos.v1.Protos.TaskStatus.Reason;
import org.apache.mesos.v1.Protos.TaskStatus.Source;
-import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.expect;
+import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert;
import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class MesosSchedulerImplTest extends EasyMockTest {
private static final String FRAMEWORK_ID = "framework-id";
private static final FrameworkID FRAMEWORK =
FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
-
+ private static final String MASTER_ID = "master-id";
+ private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
+ .setId(MASTER_ID)
+ .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+ .setPort(5050).build();
private static final String SLAVE_HOST = "slave-hostname";
private static final Protos.SlaveID SLAVE_ID =
Protos.SlaveID.newBuilder().setValue("slave-id").build();
- private static final String SLAVE_HOST_2 = "slave-hostname-2";
- private static final Protos.SlaveID SLAVE_ID_2 =
- Protos.SlaveID.newBuilder().setValue("slave-id-2").build();
- private static final Protos.ExecutorID EXECUTOR_ID =
- Protos.ExecutorID.newBuilder().setValue("executor-id").build();
private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
private static final Protos.Offer OFFER = Protos.Offer.newBuilder()
- .setFrameworkId(ProtosConversion.convert(FRAMEWORK))
+ .setFrameworkId(convert(FRAMEWORK))
.setSlaveId(SLAVE_ID)
.setHostname(SLAVE_HOST)
- .setId(ProtosConversion.convert(OFFER_ID))
- .build();
- private static final HostOffer HOST_OFFER = new HostOffer(
- ProtosConversion.convert(OFFER),
- IHostAttributes.build(
- new HostAttributes()
- .setHost(SLAVE_HOST)
- .setSlaveId(SLAVE_ID.getValue())
- .setMode(NONE)
- .setAttributes(ImmutableSet.of())));
- private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
- private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder(OFFER)
- .setSlaveId(SLAVE_ID_2)
- .setHostname(SLAVE_HOST_2)
- .setId(ProtosConversion.convert(OFFER_ID_2))
+ .setId(convert(OFFER_ID))
.build();
- private static final HostOffer HOST_OFFER_2 = new HostOffer(
- ProtosConversion.convert(OFFER_2),
- IHostAttributes.build(
- new HostAttributes()
- .setHost(SLAVE_HOST_2)
- .setSlaveId(SLAVE_ID_2.getValue())
- .setMode(NONE)
- .setAttributes(ImmutableSet.of())));
+
+ private static final ExecutorID EXECUTOR_ID =
+ ExecutorID.newBuilder().setValue("executor-id").build();
private static final TaskStatus STATUS_NO_REASON = TaskStatus.newBuilder()
.setState(TaskState.TASK_RUNNING)
@@ -121,358 +74,129 @@ public class MesosSchedulerImplTest extends EasyMockTest {
.setReason(Reason.REASON_COMMAND_EXECUTOR_FAILED)
.build();
- private static final TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON
- .toBuilder()
- .setReason(Reason.REASON_RECONCILIATION)
- .build();
-
- private static final TaskStatusReceived PUBSUB_RECONCILIATION_EVENT = new TaskStatusReceived(
- STATUS_RECONCILIATION.getState(),
- Optional.of(STATUS_RECONCILIATION.getSource()),
- Optional.of(STATUS_RECONCILIATION.getReason()),
- Optional.of(1000000L)
- );
-
- private StorageTestUtil storageUtil;
- private Command shutdownCommand;
- private TaskStatusHandler statusHandler;
- private OfferManager offerManager;
+ private MesosCallbackHandler handler;
private SchedulerDriver driver;
- private EventSink eventSink;
- private FakeStatsProvider statsProvider;
private MesosSchedulerImpl scheduler;
@Before
public void setUp() {
- Logger log = LoggerFactory.getLogger("");
- initializeScheduler(log);
- }
-
- private void initializeScheduler(Logger logger) {
- storageUtil = new StorageTestUtil(this);
- shutdownCommand = createMock(Command.class);
- statusHandler = createMock(TaskStatusHandler.class);
- offerManager = createMock(OfferManager.class);
- eventSink = createMock(EventSink.class);
- statsProvider = new FakeStatsProvider();
-
- scheduler = new MesosSchedulerImpl(
- storageUtil.storage,
- new Lifecycle(shutdownCommand),
- statusHandler,
- offerManager,
- eventSink,
- MoreExecutors.sameThreadExecutor(),
- new CachedCounters(statsProvider),
- logger,
- statsProvider);
+ handler = createMock(MesosCallbackHandler.class);
driver = createMock(SchedulerDriver.class);
+ scheduler = new MesosSchedulerImpl(handler);
}
- @Test(expected = IllegalStateException.class)
- public void testBadOrdering() {
- control.replay();
+ @Test
+ public void testSlaveLost() {
+ handler.handleLostAgent(convert(SLAVE_ID));
+ expectLastCall().once();
- // Should fail since the scheduler is not yet registered.
- scheduler.resourceOffers(driver, ImmutableList.of());
- }
+ control.replay();
- @Test
- public void testNoOffers() {
- new AbstractRegisteredTest() {
- @Override
- void test() {
- scheduler.resourceOffers(driver, ImmutableList.of());
- }
- }.run();
+ scheduler.slaveLost(driver, SLAVE_ID);
}
@Test
- public void testAcceptOffer() {
- new AbstractOfferTest() {
- @Override
- void respondToOffer() {
- expectOfferAttributesSaved(HOST_OFFER);
- offerManager.addOffer(HOST_OFFER);
- }
- }.run();
- }
+ public void testRegistered() {
+ handler.handleRegistration(FRAMEWORK, convert(MASTER));
+ expectLastCall().once();
- @Test
- public void testAcceptOfferDebugLogging() {
- Logger mockLogger = createMock(Logger.class);
- mockLogger.info(anyString());
- mockLogger.debug(anyString(), EasyMock.<Object>anyObject());
- initializeScheduler(mockLogger);
-
- new AbstractOfferTest() {
- @Override
- void respondToOffer() {
- expectOfferAttributesSaved(HOST_OFFER);
- offerManager.addOffer(HOST_OFFER);
- }
- }.run();
- }
+ control.replay();
- @Test
- public void testAttributesModePreserved() {
- new AbstractOfferTest() {
- @Override
- void respondToOffer() {
- IHostAttributes draining =
- IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING));
- expect(storageUtil.attributeStore.getHostAttributes(HOST_OFFER.getOffer().getHostname()))
- .andReturn(Optional.of(draining));
- IHostAttributes saved = IHostAttributes.build(
- Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING));
- expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
-
- HostOffer offer = new HostOffer(HOST_OFFER.getOffer(), draining);
- offerManager.addOffer(offer);
- }
- }.run();
+ scheduler.registered(driver, convert(FRAMEWORK), MASTER);
}
@Test
- public void testStatusUpdate() {
- // Test multiple variations of fields in TaskStatus to cover all branches.
- new StatusUpdater(STATUS).run();
- control.verify();
- control.reset();
- new StatusUpdater(STATUS.toBuilder().clearSource().build()).run();
- control.verify();
- control.reset();
- new StatusUpdater(STATUS.toBuilder().clearReason().build()).run();
- control.verify();
- control.reset();
- new StatusUpdater(STATUS.toBuilder().clearMessage().build()).run();
- }
+ public void testDisconnected() {
+ handler.handleDisconnection();
+ expectLastCall().once();
- @Test(expected = SchedulerException.class)
- public void testStatusUpdateFails() {
- new AbstractStatusTest() {
- @Override
- void expectations() {
- eventSink.post(new TaskStatusReceived(
- STATUS.getState(),
- Optional.of(STATUS.getSource()),
- Optional.of(STATUS.getReason()),
- Optional.of(1000000L)
- ));
- statusHandler.statusUpdate(status);
- expectLastCall().andThrow(new StorageException("Injected."));
- }
- }.run();
- }
+ control.replay();
- @Test
- public void testMultipleOffers() {
- new AbstractRegisteredTest() {
- @Override
- void expectations() {
- expectOfferAttributesSaved(HOST_OFFER);
- expectOfferAttributesSaved(HOST_OFFER_2);
- offerManager.addOffer(HOST_OFFER);
- offerManager.addOffer(HOST_OFFER_2);
- }
-
- @Override
- void test() {
- scheduler.resourceOffers(driver,
- ImmutableList.of(
- ProtosConversion.convert(HOST_OFFER.getOffer()),
- ProtosConversion.convert(HOST_OFFER_2.getOffer())));
- }
- }.run();
+ scheduler.disconnected(driver);
}
@Test
- public void testDisconnected() {
- new AbstractRegisteredTest() {
- @Override
- void expectations() {
- eventSink.post(new DriverDisconnected());
- }
-
- @Override
- void test() {
- scheduler.disconnected(driver);
- assertEquals(1L, statsProvider.getLongValue("scheduler_framework_disconnects"));
- }
- }.run();
- }
+ public void testReRegistered() {
+ handler.handleReregistration(convert(MASTER));
+ expectLastCall().once();
- @Test
- public void testFrameworkMessageIgnored() {
control.replay();
- scheduler.frameworkMessage(
- driver,
- EXECUTOR_ID,
- SLAVE_ID,
- "hello".getBytes(StandardCharsets.UTF_8));
+ scheduler.reregistered(driver, MASTER);
}
@Test
- public void testSlaveLost() {
+ public void testResourceOffers() {
+ handler.handleRegistration(FRAMEWORK, convert(MASTER));
+ expectLastCall().once();
+ handler.handleOffers(ImmutableList.of(convert(OFFER)));
+ expectLastCall().once();
+
control.replay();
- scheduler.slaveLost(driver, SLAVE_ID);
- assertEquals(1L, statsProvider.getLongValue("slaves_lost"));
+ scheduler.registered(driver, convert(FRAMEWORK), MASTER);
+ scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
}
- @Test
- public void testReregistered() {
+ @Test(expected = IllegalStateException.class)
+ public void testBadOrdering() {
control.replay();
- scheduler.reregistered(driver, Protos.MasterInfo.getDefaultInstance());
+ // Should fail since the scheduler is not yet registered.
+ scheduler.resourceOffers(driver, ImmutableList.of());
}
@Test
public void testOfferRescinded() {
- offerManager.cancelOffer(OFFER_ID);
+ handler.handleRescind(OFFER_ID);
+ expectLastCall().once();
control.replay();
- scheduler.offerRescinded(driver, ProtosConversion.convert(OFFER_ID));
- assertEquals(1L, statsProvider.getLongValue("offers_rescinded"));
+ scheduler.offerRescinded(driver, convert(OFFER_ID));
}
@Test
- public void testError() {
- shutdownCommand.execute();
+ public void testStatusUpdate() {
+ handler.handleUpdate(STATUS);
+ expectLastCall().once();
control.replay();
- scheduler.error(driver, "error");
+ scheduler.statusUpdate(driver, convert(STATUS));
}
@Test
- public void testExecutorLost() {
+ public void testError() {
+ handler.handleError("Oh No!");
+ expectLastCall().once();
+
control.replay();
- scheduler.executorLost(driver, EXECUTOR_ID, SLAVE_ID, 1);
+ scheduler.error(driver, "Oh No!");
}
@Test
- public void testStatusReconciliationAcceptsDebugLogging() {
- Logger mockLogger = createMock(Logger.class);
- mockLogger.info(anyString());
- mockLogger.debug(anyString());
- initializeScheduler(mockLogger);
-
- new AbstractStatusReconciliationTest() {
- @Override
- void expectations() {
- eventSink.post(PUBSUB_RECONCILIATION_EVENT);
- statusHandler.statusUpdate(status);
- }
- }.run();
- }
-
- private class StatusUpdater extends AbstractStatusTest {
- StatusUpdater(TaskStatus status) {
- super(status);
- }
-
- @Override
- void expectations() {
- eventSink.post(new TaskStatusReceived(
- status.getState(),
- Optional.fromNullable(status.getSource()),
- status.hasReason() ? Optional.of(status.getReason()) : Optional.absent(),
- Optional.of(1000000L)
- ));
- statusHandler.statusUpdate(status);
- }
- }
-
- private void expectOfferAttributesSaved(HostOffer offer) {
- expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
- .andReturn(Optional.absent());
- IHostAttributes defaultMode = IHostAttributes.build(
- Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE));
- expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
- }
-
- private abstract class AbstractRegisteredTest {
- private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
- AbstractRegisteredTest() {
- // Prevent otherwise silent noop tests that forget to call run().
- addTearDown(new TearDown() {
- @Override
- public void tearDown() {
- assertTrue(runCalled.get());
- }
- });
- }
-
- void run() {
- runCalled.set(true);
- eventSink.post(new DriverRegistered());
- storageUtil.expectOperations();
- storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
- expectations();
-
- control.replay();
-
- scheduler.registered(
- driver,
- ProtosConversion.convert(FRAMEWORK),
- Protos.MasterInfo.getDefaultInstance());
- test();
- }
-
- void expectations() {
- // Default no-op, subclasses may override.
- }
-
- abstract void test();
- }
+ public void testFrameworkMessage() {
+ handler.handleMessage(EXECUTOR_ID, convert(SLAVE_ID));
+ expectLastCall().once();
- private abstract class AbstractOfferTest extends AbstractRegisteredTest {
- AbstractOfferTest() {
- super();
- }
-
- abstract void respondToOffer();
-
- @Override
- void expectations() {
- respondToOffer();
- }
+ control.replay();
- @Override
- void test() {
- scheduler.resourceOffers(
- driver,
- ImmutableList.of(ProtosConversion.convert(HOST_OFFER.getOffer())));
- }
+ scheduler.frameworkMessage(
+ driver,
+ convert(EXECUTOR_ID),
+ SLAVE_ID,
+ "message".getBytes(StandardCharsets.UTF_8));
}
- private abstract class AbstractStatusTest extends AbstractRegisteredTest {
- protected final TaskStatus status;
-
- AbstractStatusTest() {
- this(STATUS);
- }
-
- AbstractStatusTest(TaskStatus status) {
- super();
- this.status = status;
- }
+ @Test
+ public void testExecutorLost() {
+ handler.handleLostExecutor(EXECUTOR_ID, convert(SLAVE_ID), 1);
- @Override
- void test() {
- scheduler.statusUpdate(driver, ProtosConversion.convert(status));
- }
- }
+ control.replay();
- private abstract class AbstractStatusReconciliationTest extends AbstractStatusTest {
- AbstractStatusReconciliationTest() {
- super(STATUS_RECONCILIATION);
- }
+ scheduler.executorLost(driver, convert(EXECUTOR_ID), SLAVE_ID, 1);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
new file mode 100644
index 0000000..988ec50
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.InetAddresses;
+import com.google.protobuf.ByteString;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.ExecutorID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+import org.apache.mesos.v1.Protos.MasterInfo;
+import org.apache.mesos.v1.Protos.Offer;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskState;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.apache.mesos.v1.Protos.TaskStatus.Source;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VersionedMesosSchedulerImplTest extends EasyMockTest {
+
+ private MesosCallbackHandler handler;
+ private StorageTestUtil storageUtil;
+ private Mesos driver;
+ private FakeStatsProvider statsProvider;
+ private DriverSettings driverSettings;
+
+ private VersionedMesosSchedulerImpl scheduler;
+
+ private static final String AGENT_HOST = "slave-hostname";
+ private static final AgentID AGENT_ID =
+ AgentID.newBuilder().setValue("slave-id").build();
+
+ private static final String FRAMEWORK_ID = "framework-id";
+ private static final FrameworkID FRAMEWORK =
+ FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+ private static final FrameworkInfo FRAMEWORK_INFO = FrameworkInfo.newBuilder()
+ .setName("name")
+ .setUser("user")
+ .setCheckpoint(true)
+ .setFailoverTimeout(1000)
+ .build();
+
+ private static final String MASTER_ID = "master-id";
+ private static final MasterInfo MASTER = MasterInfo.newBuilder()
+ .setId(MASTER_ID)
+ .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+ .setPort(5050).build();
+
+ private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
+ private static final Offer OFFER = Offer.newBuilder()
+ .setFrameworkId(FRAMEWORK)
+ .setAgentId(AGENT_ID)
+ .setHostname(AGENT_HOST)
+ .setId(OFFER_ID)
+ .build();
+
+ private static final TaskStatus STATUS = TaskStatus.newBuilder()
+ .setState(TaskState.TASK_RUNNING)
+ .setSource(Source.SOURCE_AGENT)
+ .setMessage("message")
+ .setTimestamp(1D)
+ .setTaskId(TaskID.newBuilder().setValue("task-id").build())
+ .build();
+
+ private static final ExecutorID EXECUTOR_ID =
+ ExecutorID.newBuilder().setValue("executor-id").build();
+
+ private static final Event OFFER_EVENT = Event.newBuilder()
+ .setType(Event.Type.OFFERS)
+ .setOffers(Event.Offers.newBuilder()
+ .addOffers(OFFER))
+ .build();
+
+ private static final Event SUBSCRIBED_EVENT = Event.newBuilder()
+ .setType(Event.Type.SUBSCRIBED)
+ .setSubscribed(Event.Subscribed.newBuilder()
+ .setFrameworkId(FRAMEWORK)
+ .setHeartbeatIntervalSeconds(15)
+ .setMasterInfo(MASTER))
+ .build();
+
+ private static final Event UPDATE_EVENT = Event.newBuilder()
+ .setType(Event.Type.UPDATE)
+ .setUpdate(Event.Update.newBuilder()
+ .setStatus(STATUS))
+ .build();
+
+ private static final Event RESCIND_EVENT = Event.newBuilder()
+ .setType(Event.Type.RESCIND)
+ .setRescind(Event.Rescind.newBuilder().setOfferId(OFFER_ID))
+ .build();
+
+ private static final Event MESSAGE_EVENT = Event.newBuilder()
+ .setType(Event.Type.MESSAGE)
+ .setMessage(Event.Message.newBuilder()
+ .setAgentId(AGENT_ID)
+ .setExecutorId(EXECUTOR_ID)
+ .setData(ByteString.copyFromUtf8("message")))
+ .build();
+
+ private static final Event ERROR_EVENT = Event.newBuilder()
+ .setType(Event.Type.ERROR)
+ .setError(Event.Error.newBuilder().setMessage("Oh no!"))
+ .build();
+
+ private static final Event FAILED_AGENT_EVENT = Event.newBuilder()
+ .setType(Event.Type.FAILURE)
+ .setFailure(Event.Failure.newBuilder().setAgentId(AGENT_ID))
+ .build();
+
+ @Before
+ public void setUp() {
+ handler = createMock(MesosCallbackHandler.class);
+ storageUtil = new StorageTestUtil(this);
+ driver = createMock(Mesos.class);
+ statsProvider = new FakeStatsProvider();
+ driverSettings = createMock(DriverSettings.class);
+
+ scheduler = new VersionedMesosSchedulerImpl(
+ handler,
+ new CachedCounters(statsProvider),
+ storageUtil.storage,
+ driverSettings);
+ }
+
+ @Test
+ public void testConnected() {
+ // Once the V1 driver has connected, we need to establish a subscription to get events
+
+ storageUtil.expectOperations();
+ expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+ expect(driverSettings.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+
+ Capture<Call> subscribeCapture = createCapture();
+
+ driver.send(capture(subscribeCapture));
+ expectLastCall().once();
+
+ control.replay();
+
+ scheduler.connected(driver);
+
+ assertTrue(subscribeCapture.hasCaptured());
+
+ Call subscribe = subscribeCapture.getValue();
+
+ assertEquals(subscribe.getType(), Call.Type.SUBSCRIBE);
+ assertEquals(subscribe.getFrameworkId(), FRAMEWORK);
+ assertEquals(
+ subscribe.getSubscribe().getFrameworkInfo(),
+ FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build());
+ }
+
+ @Test
+ public void testDisconnected() {
+ handler.handleDisconnection();
+
+ control.replay();
+
+ scheduler.disconnected(driver);
+ }
+
+ @Test
+ public void testSubscription() {
+ handler.handleRegistration(FRAMEWORK, MASTER);
+
+ control.replay();
+
+ scheduler.received(driver, SUBSCRIBED_EVENT);
+
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_SUBSCRIBED"));
+ }
+
+ @Test
+ public void testOffers() {
+ handler.handleRegistration(FRAMEWORK, MASTER);
+ handler.handleOffers(ImmutableList.of(OFFER));
+
+ control.replay();
+
+ scheduler.received(driver, SUBSCRIBED_EVENT);
+ scheduler.received(driver, OFFER_EVENT);
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_OFFERS"));
+ }
+
+ @Test
+ public void testRescind() {
+ handler.handleRescind(OFFER_ID);
+
+ control.replay();
+
+ scheduler.received(driver, RESCIND_EVENT);
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_RESCIND"));
+ }
+
+ @Test
+ public void testUpdate() {
+ handler.handleUpdate(STATUS);
+
+ control.replay();
+
+ scheduler.received(driver, UPDATE_EVENT);
+
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_UPDATE"));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBadOrdering() {
+ // get an offer before the driver has registered
+
+ control.replay();
+
+ scheduler.received(driver, OFFER_EVENT);
+ }
+
+ @Test
+ public void testMessage() {
+ handler.handleMessage(EXECUTOR_ID, AGENT_ID);
+
+ control.replay();
+
+ scheduler.received(driver, MESSAGE_EVENT);
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_MESSAGE"));
+ }
+
+ @Test
+ public void testError() {
+ handler.handleError("Oh no!");
+
+ control.replay();
+
+ scheduler.received(driver, ERROR_EVENT);
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_ERROR"));
+ }
+
+ @Test
+ public void testFailedAgent() {
+ handler.handleLostAgent(AGENT_ID);
+
+ control.replay();
+
+ scheduler.received(driver, FAILED_AGENT_EVENT);
+ assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_FAILURE"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
new file mode 100644
index 0000000..72aede8
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VersionedSchedulerDriverServiceTest extends EasyMockTest {
+
+ private static final String FRAMEWORK_ID = "framework_id";
+ private static final Protos.OfferID OFFER_ID =
+ Protos.OfferID.newBuilder().setValue("offer-id").build();
+ private static final Protos.Filters FILTER =
+ Protos.Filters.newBuilder().setRefuseSeconds(10).build();
+ private static final DriverSettings SETTINGS = new DriverSettings(
+ "fakemaster",
+ Optional.absent(),
+ Protos.FrameworkInfo.newBuilder()
+ .setUser("framework user")
+ .setName("test framework")
+ .build());
+
+ private StorageTestUtil storage;
+ private Scheduler scheduler;
+ private VersionedSchedulerDriverService driverService;
+ private VersionedDriverFactory driverFactory;
+ private Mesos mesos;
+
+ @Before
+ public void setUp() {
+ scheduler = createMock(Scheduler.class);
+ storage = new StorageTestUtil(this);
+ driverFactory = createMock(VersionedDriverFactory.class);
+ mesos = createMock(Mesos.class);
+ driverService = new VersionedSchedulerDriverService(
+ storage.storage,
+ SETTINGS,
+ scheduler,
+ driverFactory);
+ }
+
+ @Test
+ public void testNoopStop() {
+ control.replay();
+
+ driverService.stopAsync().awaitTerminated();
+ }
+
+ @Test
+ public void testStop() {
+ expectStart();
+ control.replay();
+
+ driverService.startAsync().awaitRunning();
+ driverService.stopAsync().awaitTerminated();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testExceptionBeforeStart() {
+ control.replay();
+ driverService.killTask("task-id");
+ }
+
+ @Test
+ public void testBlockingBeforeRegistered() throws InterruptedException {
+ expectStart();
+ control.replay();
+ driverService.startAsync().awaitRunning();
+
+ Thread killRunner = new Thread(() -> {
+ driverService.killTask("task-id");
+ });
+
+ killRunner.start();
+
+ // A hack to ensure the thread actually executes the method
+ Thread.sleep(1000L);
+ assertEquals(Thread.State.WAITING, killRunner.getState());
+
+ killRunner.interrupt();
+ }
+
+ @Test
+ public void testKill() {
+ expectStart();
+ expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+ Capture<Call> killCapture = createCapture();
+ mesos.send(capture(killCapture));
+ expectLastCall().once();
+
+ control.replay();
+ driverService.startAsync().awaitRunning();
+ driverService.registered(new PubsubEvent.DriverRegistered());
+
+ driverService.killTask("task-id");
+
+ assertTrue(killCapture.hasCaptured());
+ Call kill = killCapture.getValue();
+ assertEquals(kill.getFrameworkId().getValue(), FRAMEWORK_ID);
+ assertEquals(kill.getType(), Call.Type.KILL);
+ assertEquals(kill.getKill().getTaskId().getValue(), "task-id");
+ }
+
+ @Test
+ public void testDecline() {
+ expectStart();
+ expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+ Capture<Call> declineCapture = createCapture();
+ mesos.send(capture(declineCapture));
+ expectLastCall().once();
+
+ control.replay();
+ driverService.startAsync().awaitRunning();
+ driverService.registered(new PubsubEvent.DriverRegistered());
+
+ driverService.declineOffer(OFFER_ID, FILTER);
+
+ assertTrue(declineCapture.hasCaptured());
+ Call decline = declineCapture.getValue();
+ assertEquals(decline.getFrameworkId().getValue(), FRAMEWORK_ID);
+ assertEquals(decline.getType(), Call.Type.DECLINE);
+ assertEquals(decline.getDecline().getOfferIdsList(), ImmutableList.of(OFFER_ID));
+ assertEquals(decline.getDecline().getFilters(), FILTER);
+ }
+
+ @Test
+ public void testAccept() {
+ expectStart();
+ expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+ Capture<Call> acceptCapture = createCapture();
+ mesos.send(capture(acceptCapture));
+ expectLastCall().once();
+
+ control.replay();
+ driverService.startAsync().awaitRunning();
+ driverService.registered(new PubsubEvent.DriverRegistered());
+
+ driverService.acceptOffers(OFFER_ID, ImmutableList.of(), FILTER);
+
+ assertTrue(acceptCapture.hasCaptured());
+ Call accept = acceptCapture.getValue();
+ assertEquals(accept.getFrameworkId().getValue(), FRAMEWORK_ID);
+ assertEquals(accept.getType(), Call.Type.ACCEPT);
+ assertEquals(accept.getAccept().getFilters(), FILTER);
+ assertEquals(accept.getAccept().getOfferIdsList(), ImmutableList.of(OFFER_ID));
+ assertEquals(accept.getAccept().getOperationsList(), ImmutableList.of());
+ }
+
+ private void expectStart() {
+ storage.expectOperations();
+ expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+ Protos.FrameworkInfo.Builder builder = SETTINGS.getFrameworkInfo().toBuilder();
+ builder.setId(Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID));
+
+ expect(driverFactory.create(
+ scheduler,
+ builder.build(),
+ SETTINGS.getMasterUri(),
+ SETTINGS.getCredentials()
+ )).andReturn(mesos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index f2275c7..1421821 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -67,6 +67,7 @@ import org.apache.shiro.subject.Subject;
import org.junit.Test;
import static org.apache.aurora.gen.ResponseCode.OK;
+import static org.apache.aurora.scheduler.app.SchedulerMain.DriverKind.SCHEDULER_DRIVER;
import static org.junit.Assert.assertEquals;
public class ThriftIT extends EasyMockTest {
@@ -97,7 +98,7 @@ public class ThriftIT extends EasyMockTest {
install(new TierModule(TaskTestUtil.TIER_CONFIG));
bind(ExecutorSettings.class).toInstance(TestExecutorSettings.THERMOS_EXECUTOR);
- install(new AppModule(configurationManagerSettings));
+ install(new AppModule(configurationManagerSettings, SCHEDULER_DRIVER));
bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
[2/2] aurora git commit: Enable Mesos HTTP API.
Posted by zm...@apache.org.
Enable Mesos HTTP API.
This patch completes the design doc[1] and enables operators to choose between
two V1 Mesos API implementations. The first is `V0Mesos` which offers the V1 API
backed by the scheduler driver and the second is `V1Mesos` which offers the V1
API backed by a new HTTP API implementation.
There are three sets of changes in this patch.
First, the V1 Mesos code requires a Scheduler callback with a different API. To
maximize code reuse, event handling logic was extracted into a
`MesosCallbackHandler` class. `VersionedMesosSchedulerImpl` was created to
implement the new callback interface. Both callbacks new use the handler class
for logic.
Second, a new driver implementation using the new API was created. All of the
logic for the new driver is encapsulated in the
`VersionedSchedulerDriverService` class.
Third, some wiring changes were done to allow for Guice to do it's work and
allow for operators to select between the different driver implementations.
[1] https://docs.google.com/document/d/1bWK8ldaQSsRXvdKwTh8tyR_0qMxAlnMW70eOKoU3myo
Testing Done:
The e2e test has been run three times, each time with a different driver option.
Bugs closed: AURORA-1887, AURORA-1888
Reviewed at https://reviews.apache.org/r/57061/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/705dbc7c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/705dbc7c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/705dbc7c
Branch: refs/heads/master
Commit: 705dbc7cd7c3ff477bcf766cdafe49a68ab47dee
Parents: 2652fe0
Author: Zameer Manji <zm...@apache.org>
Authored: Thu Mar 2 15:07:11 2017 -0800
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Mar 2 15:07:11 2017 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 7 +
examples/vagrant/upstart/aurora-scheduler.conf | 5 +-
.../aurora/benchmark/StatusUpdateBenchmark.java | 6 +-
.../apache/aurora/scheduler/app/AppModule.java | 12 +-
.../aurora/scheduler/app/SchedulerMain.java | 22 +-
.../scheduler/mesos/LibMesosLoadingModule.java | 29 +-
.../scheduler/mesos/MesosCallbackHandler.java | 288 +++++++++++++
.../scheduler/mesos/MesosSchedulerImpl.java | 212 +--------
.../scheduler/mesos/ProtosConversion.java | 28 ++
.../scheduler/mesos/SchedulerDriverModule.java | 50 ++-
.../scheduler/mesos/VersionedDriverFactory.java | 32 ++
.../mesos/VersionedMesosSchedulerImpl.java | 198 +++++++++
.../mesos/VersionedSchedulerDriverService.java | 254 +++++++++++
.../aurora/scheduler/app/SchedulerIT.java | 7 +-
.../mesos/MesosCallbackHandlerTest.java | 430 +++++++++++++++++++
.../scheduler/mesos/MesosSchedulerImplTest.java | 424 ++++--------------
.../mesos/VersionedMesosSchedulerImplTest.java | 275 ++++++++++++
.../VersionedSchedulerDriverServiceTest.java | 194 +++++++++
.../aurora/scheduler/thrift/ThriftIT.java | 3 +-
19 files changed, 1907 insertions(+), 569 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 2391d32..1142f75 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -6,6 +6,13 @@
- Add message parameter to `killTasks` RPC.
- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information.
- Add support for per-task volume mounts for Mesos containers to the Aurora config DSL.
+* Added the `-mesos_driver` flag to the scheduler with three possible options:
+ `SCHEDULER_DRIVER`, `V0_MESOS`, `V1_MESOS`. The first uses the original driver
+ and the latter two use two new drivers from `libmesos`. `V0_MESOS` uses the
+ `SCHEDULER_DRIVER` under the hood and `V1_MESOS` uses a new HTTP API aware
+ driver. Users that want to use the HTTP API should use `V1_MESOS`.
+ Performance sensitive users should stick with the `SCHEDULER_DRIVER` or
+ `V0_MESOS` drivers.
0.17.0
======
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf
index 49fdcbd..31fa036 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -16,7 +16,7 @@ respawn
post-stop exec sleep 5
# Environment variables control the behavior of the Mesos scheduler driver (libmesos).
-env GLOG_v=0
+env GLOG_v=1
env LIBPROCESS_PORT=8083
env LIBPROCESS_IP=192.168.33.7
env DIST_DIR=/home/vagrant/aurora/dist
@@ -54,4 +54,5 @@ exec bin/aurora-scheduler \
-enable_revocable_ram=true \
-allow_gpu_resource=true \
-allow_container_volumes=true \
- -offer_filter_duration=0secs
+ -offer_filter_duration=0secs \
+ -mesos_driver=V1_DRIVER
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index 6c2bf46..95496c1 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -61,6 +61,8 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.mesos.DriverFactory;
import org.apache.aurora.scheduler.mesos.DriverSettings;
+import org.apache.aurora.scheduler.mesos.MesosCallbackHandler;
+import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
import org.apache.aurora.scheduler.mesos.ProtosConversion;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
@@ -183,8 +185,10 @@ public class StatusUpdateBenchmark {
bind(Driver.class).toInstance(new FakeDriver());
bind(Scheduler.class).to(MesosSchedulerImpl.class);
bind(MesosSchedulerImpl.class).in(Singleton.class);
+ bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
+ bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
bind(Executor.class)
- .annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+ .annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
.toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor(
"SchedulerImpl-%d",
LoggerFactory.getLogger(StatusUpdateBenchmark.class)));
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index e2ef9c3..081dff2 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -35,6 +35,7 @@ import org.apache.aurora.gen.Container;
import org.apache.aurora.gen.Container._Fields;
import org.apache.aurora.scheduler.SchedulerModule;
import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.app.SchedulerMain.DriverKind;
import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
import org.apache.aurora.scheduler.events.PubsubEventModule;
@@ -105,13 +106,15 @@ public class AppModule extends AbstractModule {
private static final Arg<Boolean> ALLOW_CONTAINER_VOLUMES = Arg.create(false);
private final ConfigurationManagerSettings configurationManagerSettings;
+ private final DriverKind kind;
@VisibleForTesting
- public AppModule(ConfigurationManagerSettings configurationManagerSettings) {
+ public AppModule(ConfigurationManagerSettings configurationManagerSettings, DriverKind kind) {
this.configurationManagerSettings = requireNonNull(configurationManagerSettings);
+ this.kind = kind;
}
- public AppModule(boolean allowGpuResource) {
+ public AppModule(boolean allowGpuResource, DriverKind kind) {
this(new ConfigurationManagerSettings(
ImmutableSet.copyOf(ALLOWED_CONTAINER_TYPES.get()),
ENABLE_DOCKER_PARAMETERS.get(),
@@ -119,7 +122,8 @@ public class AppModule extends AbstractModule {
REQUIRE_DOCKER_USE_EXECUTOR.get(),
allowGpuResource,
ENABLE_MESOS_FETCHER.get(),
- ALLOW_CONTAINER_VOLUMES.get()));
+ ALLOW_CONTAINER_VOLUMES.get()),
+ kind);
}
@Override
@@ -149,7 +153,7 @@ public class AppModule extends AbstractModule {
install(new QuotaModule());
install(new JettyServerModule());
install(new PreemptorModule());
- install(new SchedulerDriverModule());
+ install(new SchedulerDriverModule(kind));
install(new SchedulerServicesModule());
install(new SchedulerModule());
install(new StateModule());
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 805e9de..3665c4d 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -93,6 +93,24 @@ public class SchedulerMain {
@CmdLine(name = "allow_gpu_resource", help = "Allow jobs to request Mesos GPU resource.")
private static final Arg<Boolean> ALLOW_GPU_RESOURCE = Arg.create(false);
+ public enum DriverKind {
+ // TODO(zmanji): Remove this option once V0_DRIVER has been proven out in production.
+ // This is the original driver that libmesos shipped with. Uses unversioned protobufs, and has
+ // minimal backwards compatability guarantees.
+ SCHEDULER_DRIVER,
+ // These are the new drivers that libmesos ships with. They use versioned (V1) protobufs for
+ // the Java API.
+ // V0 Driver offers the V1 API over the old Scheduler Driver. It does not fully support
+ // the V1 API (ie mesos maintenance).
+ V0_DRIVER,
+ // V1 Driver offers the V1 API over a full HTTP API implementation. It allows for maintenance
+ // primatives and other new features.
+ V1_DRIVER,
+ }
+
+ @CmdLine(name = "mesos_driver", help = "Which Mesos Driver to use")
+ private static final Arg<DriverKind> DRIVER_IMPL = Arg.create(DriverKind.SCHEDULER_DRIVER);
+
@Inject private SingletonService schedulerService;
@Inject private HttpService httpService;
@Inject private SchedulerLifecycle schedulerLifecycle;
@@ -141,7 +159,7 @@ public class SchedulerMain {
return Modules.combine(
new LifecycleModule(),
new StatsModule(),
- new AppModule(ALLOW_GPU_RESOURCE.get()),
+ new AppModule(ALLOW_GPU_RESOURCE.get(), DRIVER_IMPL.get()),
new CronModule(),
new DbModule.MigrationManagerModule(),
DbModule.productionModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)),
@@ -203,7 +221,7 @@ public class SchedulerMain {
List<Module> modules = ImmutableList.<Module>builder()
.add(
new CommandLineDriverSettingsModule(ALLOW_GPU_RESOURCE.get()),
- new LibMesosLoadingModule(),
+ new LibMesosLoadingModule(DRIVER_IMPL.get()),
new MesosLogStreamModule(FlaggedZooKeeperConfig.create()),
new LogStorageModule(),
new TierModule(),
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
index e1a2359..3e943ff 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
@@ -15,12 +15,39 @@ package org.apache.aurora.scheduler.mesos;
import com.google.inject.AbstractModule;
+import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.mesos.v1.scheduler.V0Mesos;
+import org.apache.mesos.v1.scheduler.V1Mesos;
+
+import static com.google.common.base.Preconditions.checkState;
+
/**
* A module that binds a driver factory which requires the libmesos native libary.
*/
public class LibMesosLoadingModule extends AbstractModule {
+ private final SchedulerMain.DriverKind kind;
+
+ public LibMesosLoadingModule(SchedulerMain.DriverKind kind) {
+ this.kind = kind;
+ }
+
@Override
protected void configure() {
- bind(DriverFactory.class).to(DriverFactoryImpl.class);
+ switch(kind) {
+ case SCHEDULER_DRIVER:
+ bind(DriverFactory.class).to(DriverFactoryImpl.class);
+ break;
+ case V0_DRIVER:
+ bind(VersionedDriverFactory.class).toInstance((scheduler, frameworkInfo, master, creds)
+ -> new V0Mesos(scheduler, frameworkInfo, master, creds.orNull()));
+ break;
+ case V1_DRIVER:
+ bind(VersionedDriverFactory.class).toInstance((scheduler, frameworkInfo, master, creds)
+ -> new V1Mesos(scheduler, master, creds.orNull()));
+ break;
+ default:
+ checkState(false, "Unknown driver kind");
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
new file mode 100644
index 0000000..801551b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+
+import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.ExecutorID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.MasterInfo;
+import org.apache.mesos.v1.Protos.Offer;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.mesos.v1.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
+
+/**
+ * Abstracts the logic of handling scheduler events/callbacks from Mesos.
+ * This interface allows the two different Mesos Scheduler Callback classes
+ * to share logic and to simplify testing.
+ */
+public interface MesosCallbackHandler {
+ void handleRegistration(FrameworkID frameworkId, MasterInfo masterInfo);
+ void handleReregistration(MasterInfo masterInfo);
+ void handleOffers(List<Offer> offers);
+ void handleDisconnection();
+ void handleRescind(OfferID offerId);
+ void handleMessage(ExecutorID executor, AgentID agent);
+ void handleError(String message);
+ void handleUpdate(TaskStatus status);
+ void handleLostAgent(AgentID agentId);
+ void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status);
+
+ class MesosCallbackHandlerImpl implements MesosCallbackHandler {
+
+ private final TaskStatusHandler taskStatusHandler;
+ private final OfferManager offerManager;
+ private final Storage storage;
+ private final Lifecycle lifecycle;
+ private final EventSink eventSink;
+ private final Executor executor;
+ private final Logger log;
+
+ private final AtomicLong offersRescinded;
+ private final AtomicLong slavesLost;
+ private final AtomicLong reRegisters;
+ private final AtomicLong offersRecieved;
+ private final AtomicLong disconnects;
+ private final AtomicLong executorsLost;
+
+ /**
+ * Binding annotation for the executor the incoming Mesos message handler uses.
+ */
+ @VisibleForTesting
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface SchedulerExecutor { }
+
+ /**
+ * Creates a new handler for callbacks.
+ *
+ * @param storage Store to save host attributes into.
+ * @param lifecycle Application lifecycle manager.
+ * @param taskStatusHandler Task status update manager.
+ * @param offerManager Offer manager.
+ * @param eventSink Pubsub sink to send driver status changes to.
+ * @param executor Executor for async work
+ */
+ @Inject
+ public MesosCallbackHandlerImpl(
+ Storage storage,
+ Lifecycle lifecycle,
+ TaskStatusHandler taskStatusHandler,
+ OfferManager offerManager,
+ EventSink eventSink,
+ @SchedulerExecutor Executor executor,
+ StatsProvider statsProvider) {
+
+ this(
+ storage,
+ lifecycle,
+ taskStatusHandler,
+ offerManager,
+ eventSink,
+ executor,
+ LoggerFactory.getLogger(MesosCallbackHandlerImpl.class),
+ statsProvider);
+ }
+
+ @VisibleForTesting
+ MesosCallbackHandlerImpl(
+ Storage storage,
+ Lifecycle lifecycle,
+ TaskStatusHandler taskStatusHandler,
+ OfferManager offerManager,
+ EventSink eventSink,
+ Executor executor,
+ Logger log,
+ StatsProvider statsProvider) {
+
+ this.storage = requireNonNull(storage);
+ this.lifecycle = requireNonNull(lifecycle);
+ this.taskStatusHandler = requireNonNull(taskStatusHandler);
+ this.offerManager = requireNonNull(offerManager);
+ this.eventSink = requireNonNull(eventSink);
+ this.executor = requireNonNull(executor);
+ this.log = requireNonNull(log);
+
+ this.offersRescinded = statsProvider.makeCounter("offers_rescinded");
+ this.slavesLost = statsProvider.makeCounter("slaves_lost");
+ this.reRegisters = statsProvider.makeCounter("scheduler_framework_reregisters");
+ this.offersRecieved = statsProvider.makeCounter("scheduler_resource_offers");
+ this.disconnects = statsProvider.makeCounter("scheduler_framework_disconnects");
+ this.executorsLost = statsProvider.makeCounter("scheduler_lost_executors");
+ }
+
+ @Override
+ public void handleRegistration(FrameworkID frameworkId, MasterInfo masterInfo) {
+ log.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
+
+ storage.write(
+ (Storage.MutateWork.NoResult.Quiet) storeProvider ->
+ storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue()));
+ eventSink.post(new PubsubEvent.DriverRegistered());
+ }
+
+ @Override
+ public void handleReregistration(MasterInfo masterInfo) {
+ log.info("Framework re-registered with master " + masterInfo);
+ reRegisters.incrementAndGet();
+ }
+
+ @Override
+ public void handleOffers(List<Offer> offers) {
+ // Don't invoke the executor or storage lock if the list of offers is empty.
+ if (offers.isEmpty()) {
+ return;
+ }
+
+ executor.execute(() -> {
+ // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+ // offers when the host attributes cannot be found. (AURORA-137)
+ storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
+ for (Offer offer : offers) {
+ IHostAttributes attributes =
+ AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
+ storeProvider.getAttributeStore().saveHostAttributes(attributes);
+ log.debug("Received offer: {}", offer);
+ offersRecieved.incrementAndGet();
+ offerManager.addOffer(new HostOffer(offer, attributes));
+ }
+ });
+ });
+ }
+
+ @Override
+ public void handleDisconnection() {
+ log.warn("Framework disconnected.");
+ this.disconnects.incrementAndGet();
+ eventSink.post(new PubsubEvent.DriverDisconnected());
+ }
+
+ @Override
+ public void handleRescind(OfferID offerId) {
+ log.info("Offer rescinded: " + offerId);
+ offerManager.cancelOffer(offerId);
+ offersRescinded.incrementAndGet();
+ }
+
+ @Override
+ public void handleMessage(ExecutorID executorID, AgentID agentID) {
+ log.warn(
+ "Ignoring framework message from {} on {}.",
+ executorID.getValue(),
+ agentID.getValue());
+ }
+
+ @Override
+ public void handleError(String message) {
+ log.error("Received error message: " + message);
+ lifecycle.shutdown();
+ }
+
+ private static void logStatusUpdate(Logger logger, TaskStatus status) {
+ // Periodic task reconciliation runs generate a large amount of no-op messages.
+ // Suppress logging for reconciliation status updates by default.
+ boolean debugLevel = status.hasReason() && status.getReason() == REASON_RECONCILIATION;
+
+ StringBuilder message = new StringBuilder("Received status update for task ")
+ .append(status.getTaskId().getValue())
+ .append(" in state ")
+ .append(status.getState());
+ if (status.hasSource()) {
+ message.append(" from ").append(status.getSource());
+ }
+ if (status.hasReason()) {
+ message.append(" with ").append(status.getReason());
+ }
+ if (status.hasMessage()) {
+ message.append(": ").append(status.getMessage());
+ }
+ if (debugLevel) {
+ logger.debug(message.toString());
+ } else {
+ logger.info(message.toString());
+ }
+ }
+
+ private static final Function<Double, Long> SECONDS_TO_MICROS =
+ seconds -> (long) (seconds * 1E6);
+
+ @Override
+ public void handleUpdate(TaskStatus status) {
+ logStatusUpdate(log, status);
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ status.getState(),
+ // Source and Reason are enums. They cannot be null so we we need to use `hasXXX`.
+ status.hasSource() ? Optional.of(status.getSource()) : Optional.absent(),
+ status.hasReason() ? Optional.of(status.getReason()) : Optional.absent(),
+ Optional.fromNullable(status.getTimestamp()).transform(SECONDS_TO_MICROS)));
+
+ try {
+ // The status handler is responsible for acknowledging the update.
+ taskStatusHandler.statusUpdate(status);
+ } catch (SchedulerException e) {
+ log.error("Status update failed due to scheduler exception: " + e, e);
+ // We re-throw the exception here to trigger an abort of the driver.
+ throw e;
+ }
+ }
+
+ @Override
+ public void handleLostAgent(AgentID agentId) {
+ log.info("Received notification of lost agent: " + agentId);
+ slavesLost.incrementAndGet();
+ }
+
+ @Override
+ public void handleLostExecutor(ExecutorID executorID, AgentID slaveID, int status) {
+ // With the current implementation of MESOS-313, Mesos is also reporting clean terminations of
+ // custom executors via the executorLost callback.
+ if (status != 0) {
+ log.warn("Lost executor " + executorID + " on slave " + slaveID + " with status " + status);
+ executorsLost.incrementAndGet();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index eb21096..c3a34d2 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -13,37 +13,14 @@
*/
package org.apache.aurora.scheduler.mesos;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
import org.apache.aurora.GuiceUtils.AllowUnchecked;
-import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.MasterInfo;
@@ -53,103 +30,29 @@ import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskStatus;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
-import static org.apache.mesos.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert;
/**
- * Location for communication with mesos.
+ * Implementation of Scheduler callback interfaces for Mesos SchedulerDriver.
*/
@VisibleForTesting
public class MesosSchedulerImpl implements Scheduler {
- private final TaskStatusHandler taskStatusHandler;
- private final OfferManager offerManager;
- private final Storage storage;
- private final Lifecycle lifecycle;
- private final EventSink eventSink;
- private final Executor executor;
- private final Logger log;
- private final CachedCounters counters;
+ private final MesosCallbackHandler handler;
private volatile boolean isRegistered = false;
- private final AtomicLong offersRescinded;
- private final AtomicLong slavesLost;
- /**
- * Binding annotation for the executor the incoming Mesos message handler uses.
- */
- @VisibleForTesting
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface SchedulerExecutor { }
-
- /**
- * Creates a new scheduler.
- *
- * @param storage Store to save host attributes into.
- * @param lifecycle Application lifecycle manager.
- * @param taskStatusHandler Task status update manager.
- * @param offerManager Offer manager.
- * @param eventSink Pubsub sink to send driver status changes to.
- * @param executor Executor for async work
- */
@Inject
- public MesosSchedulerImpl(
- Storage storage,
- Lifecycle lifecycle,
- TaskStatusHandler taskStatusHandler,
- OfferManager offerManager,
- EventSink eventSink,
- @SchedulerExecutor Executor executor,
- CachedCounters counters,
- StatsProvider statsProvider) {
-
- this(
- storage,
- lifecycle,
- taskStatusHandler,
- offerManager,
- eventSink,
- executor,
- counters,
- LoggerFactory.getLogger(MesosSchedulerImpl.class),
- statsProvider);
- }
-
- @VisibleForTesting
- MesosSchedulerImpl(
- Storage storage,
- Lifecycle lifecycle,
- TaskStatusHandler taskStatusHandler,
- OfferManager offerManager,
- EventSink eventSink,
- Executor executor,
- CachedCounters counters,
- Logger log,
- StatsProvider statsProvider) {
-
- this.storage = requireNonNull(storage);
- this.lifecycle = requireNonNull(lifecycle);
- this.taskStatusHandler = requireNonNull(taskStatusHandler);
- this.offerManager = requireNonNull(offerManager);
- this.eventSink = requireNonNull(eventSink);
- this.executor = requireNonNull(executor);
- this.counters = requireNonNull(counters);
- this.log = requireNonNull(log);
- this.offersRescinded = statsProvider.makeCounter("offers_rescinded");
- this.slavesLost = statsProvider.makeCounter("slaves_lost");
+ MesosSchedulerImpl(MesosCallbackHandler handler) {
+ this.handler = requireNonNull(handler);
}
@Override
public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
- log.info("Received notification of lost agent: " + slaveId);
- slavesLost.incrementAndGet();
+ handler.handleLostAgent(convert(slaveId));
}
@Override
@@ -157,122 +60,48 @@ public class MesosSchedulerImpl implements Scheduler {
SchedulerDriver driver,
final FrameworkID frameworkId,
MasterInfo masterInfo) {
-
- log.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
-
- storage.write(
- (NoResult.Quiet) storeProvider ->
- storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue()));
+ handler.handleRegistration(convert(frameworkId), convert(masterInfo));
isRegistered = true;
- eventSink.post(new DriverRegistered());
}
@Override
public void disconnected(SchedulerDriver schedulerDriver) {
- log.warn("Framework disconnected.");
- counters.get("scheduler_framework_disconnects").incrementAndGet();
- eventSink.post(new DriverDisconnected());
+ handler.handleDisconnection();
}
@Override
public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
- log.info("Framework re-registered with master " + masterInfo);
- counters.get("scheduler_framework_reregisters").incrementAndGet();
+ handler.handleReregistration(convert(masterInfo));
}
@Timed("scheduler_resource_offers")
@Override
public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
- Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
-
- executor.execute(() -> {
- // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
- // offers when the host attributes cannot be found. (AURORA-137)
- storage.write((NoResult.Quiet) storeProvider -> {
- for (Offer offer : offers) {
- org.apache.mesos.v1.Protos.Offer o = ProtosConversion.convert(offer);
- IHostAttributes attributes =
- AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), o);
- storeProvider.getAttributeStore().saveHostAttributes(attributes);
- log.debug("Received offer: {}", offer);
- counters.get("scheduler_resource_offers").incrementAndGet();
- offerManager.addOffer(new HostOffer(o, attributes));
- }
- });
- });
+ checkState(isRegistered, "Must be registered before receiving offers.");
+ handler.handleOffers(Lists.transform(offers, ProtosConversion::convert));
}
@Override
public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
- log.info("Offer rescinded: " + offerId);
- offerManager.cancelOffer(ProtosConversion.convert(offerId));
- offersRescinded.incrementAndGet();
- }
-
- private static void logStatusUpdate(Logger logger, TaskStatus status) {
- // Periodic task reconciliation runs generate a large amount of no-op messages.
- // Suppress logging for reconciliation status updates by default.
- boolean debugLevel = status.hasReason() && status.getReason() == REASON_RECONCILIATION;
-
- StringBuilder message = new StringBuilder("Received status update for task ")
- .append(status.getTaskId().getValue())
- .append(" in state ")
- .append(status.getState());
- if (status.hasSource()) {
- message.append(" from ").append(status.getSource());
- }
- if (status.hasReason()) {
- message.append(" with ").append(status.getReason());
- }
- if (status.hasMessage()) {
- message.append(": ").append(status.getMessage());
- }
- if (debugLevel) {
- logger.debug(message.toString());
- } else {
- logger.info(message.toString());
- }
+ handler.handleRescind(convert(offerId));
}
- private static final Function<Double, Long> SECONDS_TO_MICROS = seconds -> (long) (seconds * 1E6);
-
@AllowUnchecked
@Timed("scheduler_status_update")
@Override
public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
- logStatusUpdate(log, status);
- org.apache.mesos.v1.Protos.TaskStatus converted = ProtosConversion.convert(status);
- eventSink.post(new TaskStatusReceived(
- converted.getState(),
- Optional.fromNullable(converted.getSource()),
- converted.hasReason() ? Optional.of(converted.getReason()) : Optional.absent(),
- Optional.fromNullable(converted.getTimestamp()).transform(SECONDS_TO_MICROS)));
-
- try {
- // The status handler is responsible for acknowledging the update.
- taskStatusHandler.statusUpdate(converted);
- } catch (SchedulerException e) {
- log.error("Status update failed due to scheduler exception: " + e, e);
- // We re-throw the exception here to trigger an abort of the driver.
- throw e;
- }
+ handler.handleUpdate(convert(status));
}
@Override
public void error(SchedulerDriver driver, String message) {
- log.error("Received error message: " + message);
- lifecycle.shutdown();
+ handler.handleError(message);
}
@Override
public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
int status) {
- // With the current implementation of MESOS-313, Mesos is also reporting clean terminations of
- // custom executors via the executorLost callback.
- if (status != 0) {
- log.warn("Lost executor " + executorID + " on slave " + slaveID + " with status " + status);
- counters.get("scheduler_lost_executors").incrementAndGet();
- }
+ handler.handleLostExecutor(convert(executorID), convert(slaveID), status);
}
@Timed("scheduler_framework_message")
@@ -282,7 +111,6 @@ public class MesosSchedulerImpl implements Scheduler {
ExecutorID executorID,
SlaveID slave,
byte[] data) {
-
- log.warn("Ignoring framework message.");
+ handler.handleMessage(convert(executorID), convert(slave));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java b/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
index bc9e23b..26112af 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/ProtosConversion.java
@@ -55,16 +55,36 @@ public final class ProtosConversion {
return convert(id, Protos.OfferID.newBuilder());
}
+ public static Protos.FrameworkID convert(org.apache.mesos.Protos.FrameworkID id) {
+ return convert(id, Protos.FrameworkID.newBuilder());
+ }
+
+ public static Protos.MasterInfo convert(org.apache.mesos.Protos.MasterInfo id) {
+ return convert(id, Protos.MasterInfo.newBuilder());
+ }
+
public static Protos.TaskStatus convert(org.apache.mesos.Protos.TaskStatus s) {
return convert(s, Protos.TaskStatus.newBuilder());
}
+ public static Protos.AgentID convert(org.apache.mesos.Protos.SlaveID s) {
+ return convert(s, Protos.AgentID.newBuilder());
+ }
+
+ public static Protos.ExecutorID convert(org.apache.mesos.Protos.ExecutorID s) {
+ return convert(s, Protos.ExecutorID.newBuilder());
+ }
+
// Methods to convert from V1 to unversioned.
public static org.apache.mesos.Protos.FrameworkID convert(Protos.FrameworkID id) {
return convert(id, org.apache.mesos.Protos.FrameworkID.newBuilder());
}
+ public static org.apache.mesos.Protos.MasterInfo convert(Protos.MasterInfo id) {
+ return convert(id, org.apache.mesos.Protos.MasterInfo.newBuilder());
+ }
+
public static org.apache.mesos.Protos.TaskID convert(Protos.TaskID id) {
return convert(id, org.apache.mesos.Protos.TaskID.newBuilder());
}
@@ -96,4 +116,12 @@ public final class ProtosConversion {
public static org.apache.mesos.Protos.Offer convert(Protos.Offer f) {
return convert(f, org.apache.mesos.Protos.Offer.newBuilder());
}
+
+ public static org.apache.mesos.Protos.ExecutorID convert(Protos.ExecutorID f) {
+ return convert(f, org.apache.mesos.Protos.ExecutorID.newBuilder());
+ }
+
+ public static org.apache.mesos.Protos.SlaveID convert(Protos.AgentID f) {
+ return convert(f, org.apache.mesos.Protos.SlaveID.newBuilder());
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index 5519323..10d4f1b 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -18,38 +18,58 @@ import java.util.concurrent.Executor;
import javax.inject.Singleton;
import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
+import org.apache.aurora.scheduler.app.SchedulerMain;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
import org.apache.mesos.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkState;
+
/**
* A module that creates a {@link Driver} binding.
*/
public class SchedulerDriverModule extends AbstractModule {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerDriverModule.class);
+ private final SchedulerMain.DriverKind kind;
+
+ public SchedulerDriverModule(SchedulerMain.DriverKind kind) {
+ this.kind = kind;
+ }
@Override
protected void configure() {
- install(new PrivateModule() {
- @Override
- protected void configure() {
+ bind(Scheduler.class).to(MesosSchedulerImpl.class);
+ bind(org.apache.mesos.v1.scheduler.Scheduler.class).to(VersionedMesosSchedulerImpl.class);
+ bind(MesosSchedulerImpl.class).in(Singleton.class);
+ bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
+ bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
+ // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
+ bind(Executor.class).annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
+ .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
+
+ switch (kind) {
+ case SCHEDULER_DRIVER:
bind(Driver.class).to(SchedulerDriverService.class);
bind(SchedulerDriverService.class).in(Singleton.class);
- expose(Driver.class);
-
- bind(Scheduler.class).to(MesosSchedulerImpl.class);
- bind(MesosSchedulerImpl.class).in(Singleton.class);
- expose(Scheduler.class);
-
- // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
- bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
- .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
- }
- });
+ break;
+ case V0_DRIVER:
+ bind(Driver.class).to(VersionedSchedulerDriverService.class);
+ bind(VersionedSchedulerDriverService.class).in(Singleton.class);
+ PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class);
+ break;
+ case V1_DRIVER:
+ bind(Driver.class).to(VersionedSchedulerDriverService.class);
+ bind(VersionedSchedulerDriverService.class).in(Singleton.class);
+ PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class);
+ break;
+ default:
+ checkState(false, "Unknown driver kind.");
+ break;
+ }
PubsubEventModule.bindSubscriber(binder(), TaskStatusStats.class);
bind(TaskStatusStats.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java
new file mode 100644
index 0000000..8afeec1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedDriverFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Scheduler;
+
+/**
+ * A layer over the constructor for {@link org.apache.mesos.v1.scheduler.Mesos}. This is needed
+ * since {@link org.apache.mesos.v1.scheduler.Mesos} implementations statically load libmesos.
+ */
+public interface VersionedDriverFactory {
+ Mesos create(
+ Scheduler scheduler,
+ Protos.FrameworkInfo frameworkInfo,
+ String master,
+ Optional<Protos.Credential> credentials);
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
new file mode 100644
index 0000000..84e3f47
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.List;
+import javax.inject.Inject;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.inject.TimedInterceptor;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of Scheduler callback interfaces for the V1 Driver.
+ */
+public class VersionedMesosSchedulerImpl implements Scheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(VersionedMesosSchedulerImpl.class);
+
+ private final CachedCounters counters;
+ private final MesosCallbackHandler handler;
+ private final Storage storage;
+ private final DriverSettings settings;
+
+ private volatile boolean isRegistered = false;
+
+ private static final String EVENT_COUNTER_STAT_PREFIX = "mesos_scheduler_event_";
+ // A cache to hold the metric names to prevent us from creating strings for every event
+ private final LoadingCache<Event.Type, String> eventMetricNameCache = CacheBuilder.newBuilder()
+ .maximumSize(Event.Type.values().length)
+ .initialCapacity(Event.Type.values().length)
+ .build(new CacheLoader<Event.Type, String>() {
+ @Override
+ public String load(Event.Type key) throws Exception {
+ return EVENT_COUNTER_STAT_PREFIX + key.name();
+ }
+ });
+
+ @Inject
+ VersionedMesosSchedulerImpl(
+ MesosCallbackHandler handler,
+ CachedCounters counters,
+ Storage storage,
+ DriverSettings settings) {
+ this.handler = requireNonNull(handler);
+ this.counters = requireNonNull(counters);
+ this.storage = requireNonNull(storage);
+ this.settings = requireNonNull(settings);
+ initializeEventMetrics();
+ }
+
+ @Override
+ public void connected(Mesos mesos) {
+ LOG.info("Connected to Mesos master.");
+
+ Optional<String> frameworkId = storage.read(
+ storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
+
+ Protos.FrameworkInfo.Builder frameworkBuilder = settings.getFrameworkInfo().toBuilder();
+
+ Call.Builder call = Call.newBuilder().setType(Call.Type.SUBSCRIBE);
+
+ if (frameworkId.isPresent()) {
+ LOG.info("Found persisted framework ID: " + frameworkId);
+ Protos.FrameworkID id = Protos.FrameworkID.newBuilder().setValue(frameworkId.get()).build();
+ frameworkBuilder.setId(id);
+ call.setFrameworkId(id);
+ } else {
+ frameworkBuilder.clearId();
+ call.clearFrameworkId();
+ LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
+ }
+
+ LOG.info("Sending subscribe call");
+ mesos.send(call.setSubscribe(Call.Subscribe.newBuilder()
+ .setFrameworkInfo(frameworkBuilder.build())
+ .build())
+ .build());
+ }
+
+ @Override
+ public void disconnected(Mesos mesos) {
+ handler.handleDisconnection();
+ }
+
+ private void initializeEventMetrics() {
+ // For variable named metrics that are keyed on mesos enums, this ensures that we set
+ // all possible metrics to 0.
+ for (Event.Type type : Event.Type.values()) {
+ this.counters.get(eventMetricNameCache.getUnchecked(type));
+ }
+ }
+
+ private void countEventMetrics(Event event) {
+ this.counters.get(eventMetricNameCache.getUnchecked(event.getType())).incrementAndGet();
+ }
+
+ @TimedInterceptor.Timed("scheduler_received")
+ @Override
+ public void received(Mesos mesos, Event event) {
+ countEventMetrics(event);
+ switch(event.getType()) {
+ case SUBSCRIBED:
+ Event.Subscribed subscribed = event.getSubscribed();
+ if (isRegistered) {
+ handler.handleReregistration(subscribed.getMasterInfo());
+ } else {
+ handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo());
+ isRegistered = true;
+ }
+ break;
+
+ case OFFERS:
+ checkState(isRegistered, "Must be registered before receiving offers.");
+ handler.handleOffers(event.getOffers().getOffersList());
+ break;
+
+ case RESCIND:
+ handler.handleRescind(event.getRescind().getOfferId());
+ break;
+
+ case INVERSE_OFFERS:
+ List<Protos.InverseOffer> offers = event.getInverseOffers().getInverseOffersList();
+ String ids = Joiner.on(",").join(
+ Lists.transform(offers, input -> input.getId().getValue()));
+ LOG.warn("Ignoring inverse offers: {}", ids);
+ break;
+
+ case RESCIND_INVERSE_OFFER:
+ Protos.OfferID id = event.getRescindInverseOffer().getInverseOfferId();
+ LOG.warn("Ignoring rescinded inverse offer: {}", id);
+ break;
+
+ case UPDATE:
+ Protos.TaskStatus status = event.getUpdate().getStatus();
+ handler.handleUpdate(status);
+ break;
+
+ case MESSAGE:
+ Event.Message m = event.getMessage();
+ handler.handleMessage(m.getExecutorId(), m.getAgentId());
+ break;
+
+ case ERROR:
+ handler.handleError(event.getError().getMessage());
+ break;
+
+ case FAILURE:
+ Event.Failure failure = event.getFailure();
+ if (failure.hasExecutorId()) {
+ handler.handleLostExecutor(
+ failure.getExecutorId(),
+ failure.getAgentId(),
+ failure.getStatus());
+ } else {
+ handler.handleLostAgent(failure.getAgentId());
+ }
+ break;
+
+ // TODO(zmanji): handle HEARTBEAT in a graceful manner
+ // For now it is ok to silently ignore heart beats because the driver wil
+ // detect disconnections for us.
+ case HEARTBEAT:
+ break;
+
+ default:
+ LOG.warn("Unknown event from Mesos \n{}", event);
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
new file mode 100644
index 0000000..9f39aeb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverService.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Collections2;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.v1.Protos.Credential;
+import org.apache.mesos.v1.Protos.Filters;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+import org.apache.mesos.v1.Protos.Offer.Operation;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A driver implementation that uses the V1 API drivers from libmesos.
+ */
+class VersionedSchedulerDriverService extends AbstractIdleService
+ implements Driver, EventSubscriber {
+ private static final Logger LOG = LoggerFactory.getLogger(VersionedSchedulerDriverService.class);
+
+ private final Storage storage;
+ private final DriverSettings driverSettings;
+ private final Scheduler scheduler;
+ private final VersionedDriverFactory factory;
+ private final SettableFuture<Mesos> mesosFuture = SettableFuture.create();
+ private final CountDownLatch terminationLatch = new CountDownLatch(1);
+ private final CountDownLatch registrationLatch = new CountDownLatch(1);
+
+ @Inject
+ VersionedSchedulerDriverService(
+ Storage storage,
+ DriverSettings settings,
+ Scheduler scheduler,
+ VersionedDriverFactory factory) {
+ this.storage = requireNonNull(storage);
+ this.driverSettings = requireNonNull(settings);
+ this.scheduler = requireNonNull(scheduler);
+ this.factory = requireNonNull(factory);
+ }
+
+ private FrameworkID getFrameworkId() {
+ String id = storage.read(storeProvider ->
+ storeProvider.getSchedulerStore().fetchFrameworkId().get());
+ return FrameworkID.newBuilder().setValue(id).build();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Optional<String> frameworkId = storage.read(
+ storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
+
+ LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri());
+ if (!driverSettings.getCredentials().isPresent()) {
+ LOG.warn("Connecting to master without authentication!");
+ }
+
+ FrameworkInfo.Builder frameworkBuilder = driverSettings.getFrameworkInfo().toBuilder();
+
+ if (frameworkId.isPresent()) {
+ LOG.info("Found persisted framework ID: " + frameworkId);
+ frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId.get()));
+ } else {
+ LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
+ }
+
+ Credential credential = driverSettings.getCredentials().orNull();
+ Mesos mesos = factory.create(
+ scheduler,
+ frameworkBuilder.build(),
+ driverSettings.getMasterUri(),
+ Optional.fromNullable(credential));
+
+ mesosFuture.set(mesos);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ terminationLatch.countDown();
+ }
+
+ @Override
+ public void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter) {
+ whenRegistered(() -> {
+ LOG.info("Accepting offer {} with ops {}", offerId, operations);
+
+ Futures.getUnchecked(mesosFuture).send(
+ Call.newBuilder()
+ .setFrameworkId(getFrameworkId())
+ .setType(Call.Type.ACCEPT)
+ .setAccept(
+ Call.Accept.newBuilder()
+ .addOfferIds(offerId)
+ .addAllOperations(operations)
+ .setFilters(filter))
+ .build());
+ });
+
+ }
+
+ @Override
+ public void declineOffer(OfferID offerId, Filters filter) {
+ whenRegistered(() -> {
+ LOG.info("Declining offer {}", offerId.getValue());
+
+ Futures.getUnchecked(mesosFuture).send(
+ Call.newBuilder().setType(Call.Type.DECLINE)
+ .setFrameworkId(getFrameworkId())
+ .setDecline(
+ Call.Decline.newBuilder()
+ .setFilters(filter)
+ .addOfferIds(offerId))
+ .build()
+ );
+ });
+ }
+
+ @Override
+ public void killTask(String taskId) {
+ whenRegistered(() -> {
+ LOG.info("Killing task {}", taskId);
+
+ Futures.getUnchecked(mesosFuture).send(
+ Call.newBuilder().setType(Call.Type.KILL)
+ .setFrameworkId(getFrameworkId())
+ .setKill(
+ Call.Kill.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId)))
+ .build()
+ );
+
+ });
+ }
+
+ @Override
+ public void acknowledgeStatusUpdate(TaskStatus status) {
+ // The Mesos API says frameworks are only supposed to acknowledge status updates
+ // with a UUID. The V0Driver accepts them just fine but the V1Driver logs every time
+ // a status update is given without a uuid. To silence logs, we drop them here.
+
+ whenRegistered(() -> {
+ if (!status.hasUuid()) {
+ return;
+ }
+
+ LOG.info("Acking status update for {} with uuid: {}",
+ status.getTaskId().getValue(),
+ status.getUuid());
+
+ Futures.getUnchecked(mesosFuture).send(
+ Call.newBuilder().setType(Call.Type.ACKNOWLEDGE)
+ .setFrameworkId(getFrameworkId())
+ .setAcknowledge(
+ Call.Acknowledge.newBuilder()
+ .setAgentId(status.getAgentId())
+ .setTaskId(status.getTaskId())
+ .setUuid(status.getUuid()))
+ .build()
+ );
+ });
+
+ }
+
+ @Override
+ public void reconcileTasks(Collection<TaskStatus> statuses) {
+ whenRegistered(() -> {
+ Collection<Call.Reconcile.Task> tasks = Collections2.transform(statuses, taskStatus ->
+ Call.Reconcile.Task.newBuilder()
+ .setTaskId(taskStatus.getTaskId())
+ .build());
+
+ Futures.getUnchecked(mesosFuture).send(
+ Call.newBuilder()
+ .setType(Call.Type.RECONCILE)
+ .setFrameworkId(getFrameworkId())
+ .setReconcile(
+ Call.Reconcile.newBuilder()
+ .addAllTasks(tasks))
+ .build()
+ );
+ });
+ }
+
+ @Override
+ public void blockUntilStopped() {
+ ensureRunning();
+ try {
+ terminationLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void abort() {
+ terminationLatch.countDown();
+ stopAsync();
+ }
+
+ @Subscribe
+ public void registered(DriverRegistered event) {
+ registrationLatch.countDown();
+ }
+
+ private void whenRegistered(Command c) {
+ ensureRunning();
+ // We need to block until registered because thats when we are guaranteed to have our
+ // framework id. Without it, we cannot construct any Call objects.
+ try {
+ registrationLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ c.execute();
+ }
+
+ private void ensureRunning() {
+ checkState(isRunning(), "Driver is not running.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 0551804..f579975 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
+import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -116,6 +117,10 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
private static final String SERVERSET_PATH = "/fake/service/path";
private static final String STATS_URL_PREFIX = "fake_url";
private static final String FRAMEWORK_ID = "integration_test_framework_id";
+ private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
+ .setId("master-id")
+ .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+ .setPort(5050).build();
private static final IHostAttributes HOST_ATTRIBUTES = IHostAttributes.build(new HostAttributes()
.setHost("host")
.setSlaveId("slave-id")
@@ -336,7 +341,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
scheduler.getValue().registered(
driver,
Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
- Protos.MasterInfo.getDefaultInstance());
+ MASTER);
awaitSchedulerReady();
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
new file mode 100644
index 0000000..80f631e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class MesosCallbackHandlerTest extends EasyMockTest {
+ private static final Protos.AgentID AGENT_ID =
+ Protos.AgentID.newBuilder().setValue("agent-id").build();
+
+ private static final String MASTER_ID = "master-id";
+ private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
+ .setId(MASTER_ID)
+ .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
+ .setPort(5050).build();
+
+ private static final Protos.ExecutorID EXECUTOR_ID =
+ Protos.ExecutorID.newBuilder().setValue("executor-id").build();
+
+ private static final String FRAMEWORK_ID = "framework-id";
+ private static final Protos.FrameworkID FRAMEWORK =
+ Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+
+ private static final String AGENT_HOST = "agent-hostname";
+
+ private static final Protos.OfferID OFFER_ID =
+ Protos.OfferID.newBuilder().setValue("offer-id").build();
+ private static final Protos.Offer OFFER = Protos.Offer.newBuilder()
+ .setFrameworkId(FRAMEWORK)
+ .setAgentId(AGENT_ID)
+ .setHostname(AGENT_HOST)
+ .setId(OFFER_ID)
+ .build();
+
+ private static final HostOffer HOST_OFFER = new HostOffer(
+ OFFER,
+ IHostAttributes.build(
+ new HostAttributes()
+ .setHost(AGENT_HOST)
+ .setSlaveId(AGENT_ID.getValue())
+ .setMode(NONE)
+ .setAttributes(ImmutableSet.of())));
+
+ private static final Protos.AgentID AGENT_ID_2 =
+ Protos.AgentID.newBuilder().setValue("agent-id2").build();
+ private static final String AGENT_HOST_2 = "agent2-hostname";
+ private static final Protos.OfferID OFFER_ID_2 =
+ Protos.OfferID.newBuilder().setValue("offer-id2").build();
+ private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder()
+ .setFrameworkId(FRAMEWORK)
+ .setAgentId(AGENT_ID_2)
+ .setHostname(AGENT_HOST_2)
+ .setId(OFFER_ID_2)
+ .build();
+
+ private static final HostOffer HOST_OFFER_2 = new HostOffer(
+ OFFER_2,
+ IHostAttributes.build(
+ new HostAttributes()
+ .setHost(AGENT_HOST_2)
+ .setSlaveId(AGENT_ID_2.getValue())
+ .setMode(NONE)
+ .setAttributes(ImmutableSet.of())));
+
+ private static final HostOffer DRAINING_HOST_OFFER = new HostOffer(
+ OFFER,
+ IHostAttributes.build(new HostAttributes()
+ .setHost(AGENT_HOST)
+ .setSlaveId(AGENT_ID.getValue())
+ .setMode(DRAINING)
+ .setAttributes(ImmutableSet.of())));
+
+ private static final Protos.TaskStatus STATUS_NO_REASON = Protos.TaskStatus.newBuilder()
+ .setState(Protos.TaskState.TASK_RUNNING)
+ .setSource(Protos.TaskStatus.Source.SOURCE_AGENT)
+ .setMessage("message")
+ .setTimestamp(1D)
+ .setTaskId(Protos.TaskID.newBuilder().setValue("task-id").build())
+ .build();
+
+ private static final Protos.TaskStatus STATUS = STATUS_NO_REASON
+ .toBuilder()
+ // Only testing data plumbing, this field with TASK_RUNNING would not normally happen,
+ .setReason(Protos.TaskStatus.Reason.REASON_COMMAND_EXECUTOR_FAILED)
+ .build();
+
+ private static final Protos.TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON
+ .toBuilder()
+ .setReason(Protos.TaskStatus.Reason.REASON_RECONCILIATION)
+ .build();
+
+ private StorageTestUtil storageUtil;
+ private Command shutdownCommand;
+ private TaskStatusHandler statusHandler;
+ private OfferManager offerManager;
+ private EventSink eventSink;
+ private FakeStatsProvider statsProvider;
+ private Logger injectedLog;
+
+ private MesosCallbackHandler handler;
+
+ @Before
+ public void setUp() {
+
+ storageUtil = new StorageTestUtil(this);
+ shutdownCommand = createMock(Command.class);
+ statusHandler = createMock(TaskStatusHandler.class);
+ offerManager = createMock(OfferManager.class);
+ eventSink = createMock(EventSink.class);
+ statsProvider = new FakeStatsProvider();
+
+ createHandler(false);
+ }
+
+ private void createHandler(boolean mockLogger) {
+ if (mockLogger) {
+ injectedLog = createMock(Logger.class);
+ } else {
+ injectedLog = LoggerFactory.getLogger("MesosCallbackHandlerTestLogger");
+ }
+
+ handler = new MesosCallbackHandler.MesosCallbackHandlerImpl(
+ storageUtil.storage,
+ new Lifecycle(shutdownCommand), // Cannot mock lifecycle
+ statusHandler,
+ offerManager,
+ eventSink,
+ MoreExecutors.directExecutor(),
+ injectedLog,
+ statsProvider);
+
+ }
+
+ @Test
+ public void testRegistration() {
+
+ storageUtil.expectOperations();
+
+ storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
+ expectLastCall();
+
+ eventSink.post(new PubsubEvent.DriverRegistered());
+
+ control.replay();
+
+ handler.handleRegistration(FRAMEWORK, MASTER);
+ }
+
+ @Test
+ public void testReRegistration() {
+ control.replay();
+
+ handler.handleReregistration(MASTER);
+ assertEquals(1L, statsProvider.getLongValue("scheduler_framework_reregisters"));
+ }
+
+ @Test
+ public void testGetEmptyOfferList() {
+ control.replay();
+
+ handler.handleOffers(ImmutableList.of());
+ }
+
+ private void expectOfferAttributesSaved(HostOffer offer) {
+ expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
+ .andReturn(Optional.absent());
+ IHostAttributes defaultMode = IHostAttributes.build(
+ Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE));
+ expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
+ }
+
+ @Test
+ public void testOffers() {
+ storageUtil.expectOperations();
+ expectOfferAttributesSaved(HOST_OFFER);
+ offerManager.addOffer(HOST_OFFER);
+
+ control.replay();
+
+ handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
+ assertEquals(1L, statsProvider.getLongValue("scheduler_resource_offers"));
+ }
+
+ @Test
+ public void testMultipleOffers() {
+ storageUtil.expectOperations();
+ expectOfferAttributesSaved(HOST_OFFER);
+ expectOfferAttributesSaved(HOST_OFFER_2);
+ offerManager.addOffer(HOST_OFFER);
+ offerManager.addOffer(HOST_OFFER_2);
+
+ control.replay();
+
+ handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer(), HOST_OFFER_2.getOffer()));
+ assertEquals(2L, statsProvider.getLongValue("scheduler_resource_offers"));
+ }
+
+ @Test
+ public void testModePreservedWhenOfferAdded() {
+ storageUtil.expectOperations();
+
+ IHostAttributes draining =
+ IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING));
+ expect(storageUtil.attributeStore.getHostAttributes(AGENT_HOST))
+ .andReturn(Optional.of(draining));
+
+ IHostAttributes saved = IHostAttributes.build(
+ Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING));
+
+ expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
+
+ // If the host is in draining, then the offer manager should get an offer with that attribute
+ offerManager.addOffer(DRAINING_HOST_OFFER);
+
+ control.replay();
+ handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
+ assertEquals(1L, statsProvider.getLongValue("scheduler_resource_offers"));
+
+ }
+
+ @Test
+ public void testDisconnection() {
+ eventSink.post(new PubsubEvent.DriverDisconnected());
+
+ control.replay();
+
+ handler.handleDisconnection();
+ assertEquals(1L, statsProvider.getLongValue("scheduler_framework_disconnects"));
+ }
+
+ @Test
+ public void testRescind() {
+ offerManager.cancelOffer(OFFER_ID);
+
+ control.replay();
+
+ handler.handleRescind(OFFER_ID);
+ assertEquals(1L, statsProvider.getLongValue("offers_rescinded"));
+ }
+
+ @Test
+ public void testError() {
+ shutdownCommand.execute();
+ expectLastCall();
+
+ control.replay();
+
+ handler.handleError("Something bad happened!");
+ }
+
+ @Test
+ public void testUpdate() {
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ STATUS.getState(),
+ Optional.fromNullable(STATUS.getSource()),
+ Optional.fromNullable(STATUS.getReason()),
+ Optional.of(1000000L)
+ ));
+ statusHandler.statusUpdate(STATUS);
+
+ control.replay();
+
+ handler.handleUpdate(STATUS);
+ }
+
+ @Test
+ public void testUpdateNoSource() {
+ Protos.TaskStatus status = STATUS.toBuilder().clearSource().build();
+
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ status.getState(),
+ Optional.absent(),
+ Optional.fromNullable(status.getReason()),
+ Optional.of(1000000L)
+ ));
+ statusHandler.statusUpdate(status);
+
+ control.replay();
+
+ handler.handleUpdate(status);
+ }
+
+ @Test
+ public void testUpdateNoReason() {
+ Protos.TaskStatus status = STATUS.toBuilder().clearReason().build();
+
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ status.getState(),
+ Optional.fromNullable(status.getSource()),
+ Optional.absent(),
+ Optional.of(1000000L)
+ ));
+ statusHandler.statusUpdate(status);
+
+ control.replay();
+
+ handler.handleUpdate(status);
+ }
+
+ @Test
+ public void testUpdateNoMessage() {
+ Protos.TaskStatus status = STATUS.toBuilder().clearMessage().build();
+
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ status.getState(),
+ Optional.fromNullable(status.getSource()),
+ Optional.fromNullable(status.getReason()),
+ Optional.of(1000000L)
+ ));
+ statusHandler.statusUpdate(status);
+
+ control.replay();
+
+ handler.handleUpdate(status);
+ }
+
+ @Test(expected = SchedulerException.class)
+ public void testUpdateWithException() {
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ STATUS.getState(),
+ Optional.fromNullable(STATUS.getSource()),
+ Optional.fromNullable(STATUS.getReason()),
+ Optional.of(1000000L)
+ ));
+ statusHandler.statusUpdate(STATUS);
+ expectLastCall().andThrow(new Storage.StorageException("Storage Failure"));
+
+ control.replay();
+
+ handler.handleUpdate(STATUS);
+ }
+
+ @Test
+ public void testReconciliationUpdateLogging() {
+ // Mock the logger so we can test that it is logged at debug
+ createHandler(true);
+ String expectedMsg = "Received status update for task task-id in state TASK_RUNNING from "
+ + "SOURCE_AGENT with REASON_RECONCILIATION: message";
+
+ injectedLog.debug(expectedMsg);
+ expectLastCall().once();
+
+ eventSink.post(new PubsubEvent.TaskStatusReceived(
+ STATUS_RECONCILIATION.getState(),
+ Optional.fromNullable(STATUS_RECONCILIATION.getSource()),
+ Optional.fromNullable(STATUS_RECONCILIATION.getReason()),
+ Optional.of(1000000L)
+ ));
+
+ statusHandler.statusUpdate(STATUS_RECONCILIATION);
+
+ control.replay();
+
+ handler.handleUpdate(STATUS_RECONCILIATION);
+ }
+
+ @Test
+ public void testLostAgent() {
+ control.replay();
+
+ handler.handleLostAgent(AGENT_ID);
+ assertEquals(1L, statsProvider.getLongValue("slaves_lost"));
+ }
+
+ @Test
+ public void testLostExecutorIgnoresOkStatus() {
+ control.replay();
+
+ handler.handleLostExecutor(EXECUTOR_ID, AGENT_ID, 0);
+ assertEquals(0L, statsProvider.getLongValue("scheduler_lost_executors"));
+ }
+
+ @Test
+ public void testLostExecutor() {
+ control.replay();
+
+ handler.handleLostExecutor(EXECUTOR_ID, AGENT_ID, 1);
+ assertEquals(1L, statsProvider.getLongValue("scheduler_lost_executors"));
+ }
+
+ @Test
+ public void testMessage() {
+ // Framework messages should be ignored.
+ control.replay();
+
+ handler.handleMessage(EXECUTOR_ID, AGENT_ID);
+ }
+}