You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/08 00:11:30 UTC
[2/3] incubator-aurora git commit: Simplify management of the driver
lifecycle using AbstractidleService.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
new file mode 100644
index 0000000..bb227fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import com.twitter.common.quantity.Data;
+
+import org.apache.aurora.Protobufs;
+import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.CommandUtil;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * A factory to create mesos task objects.
+ */
+public interface MesosTaskFactory {
+
+ /**
+ * Creates a mesos task object.
+ *
+ * @param task Assigned task to translate into a task object.
+ * @param slaveId Id of the slave the task is being assigned to.
+ * @return A new task.
+ * @throws SchedulerException If the task could not be encoded.
+ */
+ TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
+
+ class ExecutorConfig {
+ private final String executorPath;
+
+ public ExecutorConfig(String executorPath) {
+ this.executorPath = checkNotBlank(executorPath);
+ }
+
+ String getExecutorPath() {
+ return executorPath;
+ }
+ }
+
+ // TODO(wfarner): Move this class to its own file to reduce visibility to package private.
+ class MesosTaskFactoryImpl implements MesosTaskFactory {
+ private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
+ private static final String EXECUTOR_PREFIX = "thermos-";
+
+ /**
+ * Name to associate with task executors.
+ */
+ @VisibleForTesting
+ static final String EXECUTOR_NAME = "aurora.task";
+
+ private final String executorPath;
+
+ @Inject
+ MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
+ this.executorPath = executorConfig.getExecutorPath();
+ }
+
+ @VisibleForTesting
+ static ExecutorID getExecutorId(String taskId) {
+ return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
+ }
+
+ public static String getJobSourceName(IJobKey jobkey) {
+ return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
+ }
+
+ public static String getJobSourceName(ITaskConfig task) {
+ return getJobSourceName(task.getJob());
+ }
+
+ public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
+ return String.format("%s.%s", getJobSourceName(task), instanceId);
+ }
+
+ @Override
+ public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
+ requireNonNull(task);
+ byte[] taskInBytes;
+ try {
+ taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
+ } catch (ThriftBinaryCodec.CodingException e) {
+ LOG.log(Level.SEVERE, "Unable to serialize task.", e);
+ throw new SchedulerException("Internal error.", e);
+ }
+
+ ITaskConfig config = task.getTask();
+ // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
+ List<Resource> resources = Resources.from(config)
+ .toResourceList(task.isSetAssignedPorts()
+ ? ImmutableSet.copyOf(task.getAssignedPorts().values())
+ : ImmutableSet.<Integer>of());
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Setting task resources to "
+ + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
+ }
+ TaskInfo.Builder taskBuilder =
+ TaskInfo.newBuilder()
+ .setName(JobKeys.canonicalString(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
+ .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
+ .setSlaveId(slaveId)
+ .addAllResources(resources)
+ .setData(ByteString.copyFrom(taskInBytes));
+
+ ExecutorInfo executor = ExecutorInfo.newBuilder()
+ .setCommand(CommandUtil.create(executorPath))
+ .setExecutorId(getExecutorId(task.getTaskId()))
+ .setName(EXECUTOR_NAME)
+ .setSource(getInstanceSourceName(config, task.getInstanceId()))
+ .addResources(
+ Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
+ .addResources(Resources.makeMesosResource(
+ Resources.RAM_MB,
+ ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
+ .build();
+ return taskBuilder
+ .setExecutor(executor)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
new file mode 100644
index 0000000..59ad9e6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.inject.PrivateModule;
+
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.mesos.Scheduler;
+
+/**
+ * A module that creates a {@link Driver} binding.
+ */
+public class SchedulerDriverModule extends PrivateModule {
+ private static final Logger LOG = Logger.getLogger(SchedulerDriverModule.class.getName());
+
+ @Override
+ protected void configure() {
+ bind(Driver.class).to(SchedulerDriverService.class);
+ bind(SchedulerDriverService.class).in(Singleton.class);
+ expose(Driver.class);
+
+ bind(Scheduler.class).to(MesosSchedulerImpl.class);
+ bind(MesosSchedulerImpl.class).in(Singleton.class);
+
+ // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
+ bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+ .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
new file mode 100644
index 0000000..88150e5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
+
+/**
+ * Manages the lifecycle of the scheduler driver, and provides a more constrained API to use it.
+ */
+class SchedulerDriverService extends AbstractIdleService implements Driver {
+ private static final Logger LOG = Logger.getLogger(SchedulerDriverService.class.getName());
+
+ private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
+ private final DriverFactory driverFactory;
+
+ private final Scheduler scheduler;
+ private final Storage storage;
+ private final DriverSettings driverSettings;
+ private final SettableFuture<SchedulerDriver> driverFuture = SettableFuture.create();
+
+ @Inject
+ SchedulerDriverService(
+ Scheduler scheduler,
+ Storage storage,
+ DriverSettings driverSettings,
+ DriverFactory driverFactory) {
+
+ this.scheduler = requireNonNull(scheduler);
+ this.storage = requireNonNull(storage);
+ this.driverSettings = requireNonNull(driverSettings);
+ this.driverFactory = requireNonNull(driverFactory);
+ }
+
+ @Override
+ protected void startUp() {
+ Optional<String> frameworkId = storage.consistentRead(
+ new Storage.Work.Quiet<Optional<String>>() {
+ @Override
+ public Optional<String> apply(Storage.StoreProvider storeProvider) {
+ return storeProvider.getSchedulerStore().fetchFrameworkId();
+ }
+ });
+
+ LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri());
+ if (!driverSettings.getCredentials().isPresent()) {
+ LOG.warning("Connecting to master without authentication!");
+ }
+
+ FrameworkInfo.Builder frameworkBuilder = driverSettings.getFrameworkInfo().toBuilder();
+
+ if (frameworkId.isPresent()) {
+ LOG.info("Found persisted framework ID: " + frameworkId);
+ frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId.get()));
+ } else {
+ LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
+ }
+
+ SchedulerDriver schedulerDriver = driverFactory.create(
+ scheduler,
+ driverSettings.getCredentials(),
+ frameworkBuilder.build(),
+ driverSettings.getMasterUri());
+ Protos.Status status = schedulerDriver.start();
+ LOG.info("Driver started with code " + status);
+
+ driverFuture.set(schedulerDriver);
+ }
+
+ @Override
+ public void blockUntilStopped() {
+ Futures.getUnchecked(driverFuture).join();
+ }
+
+ @Override
+ protected void shutDown() throws ExecutionException, InterruptedException {
+ // WARNING: stop() and stop(false) are dangerous, avoid at all costs. See the docs for
+ // SchedulerDriver for more details.
+ driverFuture.get().stop(true /* failover */);
+ }
+
+ @Override
+ public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) {
+ checkState(isRunning(), "Driver is not running.");
+ Futures.getUnchecked(driverFuture)
+ .launchTasks(ImmutableList.of(offerId), ImmutableList.of(task));
+ }
+
+ @Override
+ public void declineOffer(Protos.OfferID offerId) {
+ checkState(isRunning(), "Driver is not running.");
+ Futures.getUnchecked(driverFuture).declineOffer(offerId);
+ }
+
+ @Override
+ public void killTask(String taskId) {
+ checkState(isRunning(), "Driver is not running.");
+ Protos.Status status = Futures.getUnchecked(driverFuture).killTask(
+ Protos.TaskID.newBuilder().setValue(taskId).build());
+
+ if (status != DRIVER_RUNNING) {
+ LOG.severe(String.format("Attempt to kill task %s failed with code %s",
+ taskId, status));
+ killFailures.incrementAndGet();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6d2ac49..6663555 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -45,13 +45,13 @@ import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.async.RescheduleCalculator;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.SideEffect.Action;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index fe16d3a..0186484 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -19,9 +19,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
-import org.apache.aurora.scheduler.MesosTaskFactory;
-import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 78a9670..9c9b659 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -20,13 +20,13 @@ import javax.inject.Inject;
import com.google.common.base.Optional;
-import org.apache.aurora.scheduler.MesosTaskFactory;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.Offer;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
index cbb711f..f4fa1cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.storage.log;
import java.nio.ByteBuffer;
+
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index 855573e..cb95d89 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
+
import javax.annotation.Nullable;
import javax.inject.Inject;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java
deleted file mode 100644
index 0f61922..0000000
--- a/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import com.google.common.base.Throwables;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.DriverFactory.DriverFactoryImpl;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class DriverFactoryImplTest extends EasyMockTest {
-
- @Test(expected = IllegalStateException.class)
- public void testMissingPropertiesParsing() {
- Properties testProperties = new Properties();
- testProperties.put(DriverFactoryImpl.PRINCIPAL_KEY, "aurora-scheduler");
-
- ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
- try {
- testProperties.store(propertiesStream, "");
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
-
- control.replay();
- DriverFactoryImpl.parseCredentials(new ByteArrayInputStream(propertiesStream.toByteArray()));
- }
-
- @Test
- public void testPropertiesParsing() {
- Properties testProperties = new Properties();
- testProperties.put(DriverFactoryImpl.PRINCIPAL_KEY, "aurora-scheduler");
- testProperties.put(DriverFactoryImpl.SECRET_KEY, "secret");
-
- ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
- try {
- testProperties.store(propertiesStream, "");
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
-
- control.replay();
- assertEquals(testProperties,
- DriverFactoryImpl.parseCredentials(
- new ByteArrayInputStream(propertiesStream.toByteArray())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/DriverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/DriverTest.java b/src/test/java/org/apache/aurora/scheduler/DriverTest.java
deleted file mode 100644
index a96dd87..0000000
--- a/src/test/java/org/apache/aurora/scheduler/DriverTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.Driver.DriverImpl;
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.mesos.Protos.Status.DRIVER_ABORTED;
-import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
-import static org.easymock.EasyMock.expect;
-
-public class DriverTest extends EasyMockTest {
-
- private static final String TASK_1 = "1";
- private static final String TASK_2 = "2";
-
- private SchedulerDriver schedulerDriver;
- private DriverImpl driver;
-
- private static Protos.TaskID createTaskId(String taskId) {
- return Protos.TaskID.newBuilder().setValue(taskId).build();
- }
-
- @Before
- public void setUp() {
- schedulerDriver = createMock(SchedulerDriver.class);
- driver = new DriverImpl();
- }
-
- @Test
- public void testNoopStop() {
- control.replay();
-
- driver.stop();
- }
-
- @Test
- public void testMultipleStops() {
- expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
- control.replay();
-
- driver.initialize(schedulerDriver);
- driver.stop();
- driver.stop();
- }
-
- @Test
- public void testStop() {
- expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
- control.replay();
-
- driver.initialize(schedulerDriver);
- driver.stop();
- }
-
- @Test
- public void testNormalLifecycle() {
- expect(schedulerDriver.killTask(createTaskId(TASK_1))).andReturn(DRIVER_RUNNING);
- expect(schedulerDriver.killTask(createTaskId(TASK_2))).andReturn(DRIVER_RUNNING);
- expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
- control.replay();
-
- driver.initialize(schedulerDriver);
- driver.killTask(TASK_1);
- driver.killTask(TASK_2);
- driver.stop();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testMustRunBeforeKill() {
- control.replay();
-
- driver.killTask(TASK_1);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testOnlyOneSetAllowed() {
- control.replay();
-
- driver.initialize(schedulerDriver);
- driver.initialize(schedulerDriver);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
deleted file mode 100644
index 8dd908e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.testing.TearDown;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.base.Command;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.SchedulerDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertTrue;
-
-public class MesosSchedulerImplTest extends EasyMockTest {
-
- private static final String FRAMEWORK_ID = "framework-id";
- private static final FrameworkID FRAMEWORK =
- FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
-
- private static final String SLAVE_HOST = "slave-hostname";
- private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("slave-id").build();
- private static final String SLAVE_HOST_2 = "slave-hostname-2";
- private static final SlaveID SLAVE_ID_2 = SlaveID.newBuilder().setValue("slave-id-2").build();
- private static final ExecutorID EXECUTOR_ID =
- ExecutorID.newBuilder().setValue("executor-id").build();
-
- private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
- private static final Offer OFFER = Offer.newBuilder()
- .setFrameworkId(FRAMEWORK)
- .setSlaveId(SLAVE_ID)
- .setHostname(SLAVE_HOST)
- .setId(OFFER_ID)
- .build();
- private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
- private static final Offer OFFER_2 = Offer.newBuilder(OFFER)
- .setSlaveId(SLAVE_ID_2)
- .setHostname(SLAVE_HOST_2)
- .setId(OFFER_ID_2)
- .build();
-
- private static final TaskStatus STATUS = TaskStatus.newBuilder()
- .setState(TaskState.TASK_RUNNING)
- .setTaskId(TaskID.newBuilder().setValue("task-id").build())
- .build();
-
- private StorageTestUtil storageUtil;
- private TaskLauncher systemLauncher;
- private TaskLauncher userLauncher;
- private SchedulerDriver driver;
- private EventSink eventSink;
-
- private MesosSchedulerImpl scheduler;
-
- @Before
- public void setUp() {
- storageUtil = new StorageTestUtil(this);
- final Lifecycle lifecycle =
- new Lifecycle(createMock(Command.class), createMock(UncaughtExceptionHandler.class));
- systemLauncher = createMock(TaskLauncher.class);
- userLauncher = createMock(TaskLauncher.class);
- eventSink = createMock(EventSink.class);
-
- Injector injector = Guice.createInjector(new AbstractModule() {
- @Override
- protected void configure() {
- bind(Storage.class).toInstance(storageUtil.storage);
- bind(Lifecycle.class).toInstance(lifecycle);
- bind(new TypeLiteral<List<TaskLauncher>>() { })
- .toInstance(Arrays.asList(systemLauncher, userLauncher));
- bind(EventSink.class).toInstance(eventSink);
- bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
- .toInstance(MoreExecutors.sameThreadExecutor());
- }
- });
- scheduler = injector.getInstance(MesosSchedulerImpl.class);
- driver = createMock(SchedulerDriver.class);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testBadOrdering() {
- control.replay();
-
- // Should fail since the scheduler is not yet registered.
- scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
- }
-
- @Test
- public void testNoOffers() throws Exception {
- new RegisteredFixture() {
- @Override
- void test() {
- scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
- }
- }.run();
- }
-
- @Test
- public void testNoAccepts() throws Exception {
- new OfferFixture() {
- @Override
- void respondToOffer() throws Exception {
- expectOfferAttributesSaved(OFFER);
- expect(systemLauncher.willUse(OFFER)).andReturn(false);
- expect(userLauncher.willUse(OFFER)).andReturn(false);
- }
- }.run();
- }
-
- @Test
- public void testOfferFirstAccepts() throws Exception {
- new OfferFixture() {
- @Override
- void respondToOffer() throws Exception {
- expectOfferAttributesSaved(OFFER);
- expect(systemLauncher.willUse(OFFER)).andReturn(true);
- }
- }.run();
- }
-
- @Test
- public void testOfferSchedulerAccepts() throws Exception {
- new OfferFixture() {
- @Override
- void respondToOffer() throws Exception {
- expectOfferAttributesSaved(OFFER);
- expect(systemLauncher.willUse(OFFER)).andReturn(false);
- expect(userLauncher.willUse(OFFER)).andReturn(true);
- }
- }.run();
- }
-
- @Test
- public void testStatusUpdateNoAccepts() throws Exception {
- new StatusFixture() {
- @Override
- void expectations() throws Exception {
- expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
- expect(userLauncher.statusUpdate(STATUS)).andReturn(false);
- }
- }.run();
- }
-
- @Test
- public void testStatusUpdateFirstAccepts() throws Exception {
- new StatusFixture() {
- @Override
- void expectations() throws Exception {
- expect(systemLauncher.statusUpdate(STATUS)).andReturn(true);
- }
- }.run();
- }
-
- @Test
- public void testStatusUpdateSecondAccepts() throws Exception {
- new StatusFixture() {
- @Override
- void expectations() throws Exception {
- expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
- expect(userLauncher.statusUpdate(STATUS)).andReturn(true);
- }
- }.run();
- }
-
- @Test(expected = SchedulerException.class)
- public void testStatusUpdateFails() throws Exception {
- new StatusFixture() {
- @Override
- void expectations() throws Exception {
- expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
- expect(userLauncher.statusUpdate(STATUS)).andThrow(new StorageException("Injected."));
- }
- }.run();
- }
-
- @Test
- public void testMultipleOffers() throws Exception {
- new RegisteredFixture() {
- @Override
- void expectations() throws Exception {
- expectOfferAttributesSaved(OFFER);
- expectOfferAttributesSaved(OFFER_2);
- expect(systemLauncher.willUse(OFFER)).andReturn(false);
- expect(userLauncher.willUse(OFFER)).andReturn(true);
- expect(systemLauncher.willUse(OFFER_2)).andReturn(false);
- expect(userLauncher.willUse(OFFER_2)).andReturn(false);
- }
-
- @Override
- void test() {
- scheduler.resourceOffers(driver, ImmutableList.of(OFFER, OFFER_2));
- }
- }.run();
- }
-
- @Test
- public void testDisconnected() throws Exception {
- new RegisteredFixture() {
- @Override
- void expectations() throws Exception {
- eventSink.post(new DriverDisconnected());
- }
-
- @Override
- void test() {
- scheduler.disconnected(driver);
- }
- }.run();
- }
-
- @Test
- public void testFrameworkMessageIgnored() throws Exception {
- control.replay();
-
- scheduler.frameworkMessage(
- driver,
- EXECUTOR_ID,
- SLAVE_ID,
- "hello".getBytes(StandardCharsets.UTF_8));
- }
-
- private void expectOfferAttributesSaved(Offer offer) {
- storageUtil.attributeStore.saveHostAttributes(Conversions.getAttributes(offer));
- }
-
- private abstract class RegisteredFixture {
- private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
- RegisteredFixture() throws Exception {
- // Prevent otherwise silent noop tests that forget to call run().
- addTearDown(new TearDown() {
- @Override
- public void tearDown() {
- assertTrue(runCalled.get());
- }
- });
- }
-
- void run() throws Exception {
- runCalled.set(true);
- eventSink.post(new DriverRegistered());
- storageUtil.expectOperations();
- storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
- expectations();
-
- control.replay();
-
- scheduler.registered(driver, FRAMEWORK, MasterInfo.getDefaultInstance());
- test();
- }
-
- void expectations() throws Exception {
- // Default no-op, subclasses may override.
- }
-
- abstract void test();
- }
-
- private abstract class OfferFixture extends RegisteredFixture {
- OfferFixture() throws Exception {
- super();
- }
-
- abstract void respondToOffer() throws Exception;
-
- @Override
- void expectations() throws Exception {
- respondToOffer();
- }
-
- @Override
- void test() {
- scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
- }
- }
-
- private abstract class StatusFixture extends RegisteredFixture {
- StatusFixture() throws Exception {
- super();
- }
-
- @Override
- void test() {
- scheduler.statusUpdate(driver, STATUS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
deleted file mode 100644
index aa026cf..0000000
--- a/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.quantity.Data;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
-import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class MesosTaskFactoryImplTest {
-
- private static final String EXECUTOR_PATH = "/twitter/fake/executor.sh";
- private static final IAssignedTask TASK = IAssignedTask.build(new AssignedTask()
- .setInstanceId(2)
- .setTaskId("task-id")
- .setAssignedPorts(ImmutableMap.of("http", 80))
- .setTask(new TaskConfig()
- .setJob(new JobKey("role", "environment", "job-name"))
- .setOwner(new Identity("role", "user"))
- .setEnvironment("environment")
- .setJobName("job-name")
- .setDiskMb(10)
- .setRamMb(100)
- .setNumCpus(5)
- .setRequestedPorts(ImmutableSet.of("http"))));
- private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
-
- private MesosTaskFactory taskFactory;
-
- @Before
- public void setUp() {
- taskFactory = new MesosTaskFactoryImpl(new ExecutorConfig(EXECUTOR_PATH));
- }
-
- private static final ExecutorInfo DEFAULT_EXECUTOR = ExecutorInfo.newBuilder()
- .setExecutorId(MesosTaskFactoryImpl.getExecutorId(TASK.getTaskId()))
- .setName(MesosTaskFactoryImpl.EXECUTOR_NAME)
- .setSource(MesosTaskFactoryImpl.getInstanceSourceName(TASK.getTask(), TASK.getInstanceId()))
- .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
- .addResources(Resources.makeMesosResource(
- Resources.RAM_MB,
- ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
- .setCommand(CommandInfo.newBuilder()
- .setValue("./executor.sh")
- .addUris(URI.newBuilder().setValue(EXECUTOR_PATH).setExecutable(true)))
- .build();
-
- @Test
- public void testExecutorInfoUnchanged() {
- TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
- assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
- assertEquals(ImmutableSet.of(
- Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
- Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
- Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
- Resources.makeMesosRangeResource(
- Resources.PORTS,
- ImmutableSet.copyOf(TASK.getAssignedPorts().values()))
- ),
- ImmutableSet.copyOf(task.getResourcesList()));
- }
-
- @Test
- public void testCreateFromPortsUnset() {
- AssignedTask assignedTask = TASK.newBuilder();
- assignedTask.unsetAssignedPorts();
- TaskInfo task = taskFactory.createFrom(IAssignedTask.build(assignedTask), SLAVE);
- assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
- assertEquals(ImmutableSet.of(
- Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
- Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
- Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb())
- ),
- ImmutableSet.copyOf(task.getResourcesList()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 9861601..97ecb74 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Optional;
import com.twitter.common.application.Lifecycle;
import com.twitter.common.application.ShutdownRegistry;
import com.twitter.common.base.Command;
@@ -27,16 +26,14 @@ import com.twitter.common.zookeeper.SingletonService.LeaderControl;
import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.scheduler.Driver.SettableDriver;
import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.SchedulerDriver;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
@@ -54,12 +51,10 @@ public class SchedulerLifecycleTest extends EasyMockTest {
private static final String FRAMEWORK_ID = "framework id";
- private DriverFactory driverFactory;
private StorageTestUtil storageUtil;
private ShutdownSystem shutdownRegistry;
- private SettableDriver driver;
+ private Driver driver;
private LeaderControl leaderControl;
- private SchedulerDriver schedulerDriver;
private DelayedActions delayedActions;
private EventSink eventSink;
private FakeStatsProvider statsProvider;
@@ -69,12 +64,10 @@ public class SchedulerLifecycleTest extends EasyMockTest {
@Before
public void setUp() {
- driverFactory = createMock(DriverFactory.class);
storageUtil = new StorageTestUtil(this);
shutdownRegistry = createMock(ShutdownSystem.class);
- driver = createMock(SettableDriver.class);
+ driver = createMock(Driver.class);
leaderControl = createMock(LeaderControl.class);
- schedulerDriver = createMock(SchedulerDriver.class);
delayedActions = createMock(DelayedActions.class);
eventSink = createMock(EventSink.class);
statsProvider = new FakeStatsProvider();
@@ -96,7 +89,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
control.replay();
schedulerLifecycle = new SchedulerLifecycle(
- driverFactory,
storageUtil.storage,
new Lifecycle(shutdownRegistry, new UncaughtExceptionHandler() {
@Override
@@ -119,12 +111,11 @@ public class SchedulerLifecycleTest extends EasyMockTest {
private void expectLoadStorage() {
storageUtil.storage.start(EasyMock.<Quiet>anyObject());
storageUtil.expectOperations();
- expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
}
private void expectInitializeDriver() {
- driver.initialize(schedulerDriver);
- expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
+ expect(driver.startAsync()).andReturn(driver);
+ driver.awaitRunning();
delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
}
@@ -137,7 +128,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
private void expectShutdown() throws Exception {
leaderControl.leave();
- driver.stop();
+ expect(driver.stopAsync()).andReturn(driver);
+ driver.awaitTerminated();
storageUtil.storage.stop();
shutdownRegistry.execute();
}
@@ -150,7 +142,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
storageUtil.storage.prepare();
expectLoadStorage();
- expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
Capture<Runnable> triggerFailover = createCapture();
delayedActions.onAutoFailover(capture(triggerFailover));
delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
@@ -176,11 +167,11 @@ public class SchedulerLifecycleTest extends EasyMockTest {
public void testRegistrationTimeout() throws Exception {
storageUtil.storage.prepare();
expectLoadStorage();
- expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
Capture<Runnable> registrationTimeout = createCapture();
delayedActions.onRegistrationTimeout(capture(registrationTimeout));
- expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
+ expect(driver.startAsync()).andReturn(driver);
+ driver.awaitRunning();
expectShutdown();
@@ -195,10 +186,10 @@ public class SchedulerLifecycleTest extends EasyMockTest {
public void testDefeatedBeforeRegistered() throws Exception {
storageUtil.storage.prepare();
expectLoadStorage();
- expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
- expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
+ expect(driver.startAsync()).andReturn(driver);
+ driver.awaitRunning();
// Important piece here is what's absent - leader presence is not advertised.
expectShutdown();
@@ -234,7 +225,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
public void testExternalShutdown() throws Exception {
storageUtil.storage.prepare();
expectLoadStorage();
- expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
Capture<Runnable> triggerFailover = createCapture();
delayedActions.onAutoFailover(capture(triggerFailover));
delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 7137971..c903894 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -75,31 +75,39 @@ import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
-import org.apache.aurora.scheduler.DriverFactory;
-import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.aurora.scheduler.mesos.DriverSettings;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.log.EntrySerializer;
import org.apache.aurora.scheduler.storage.log.LogStorageModule;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
+import org.apache.mesos.Protos;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.MasterInfo;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IMocksControl;
import org.junit.Before;
import org.junit.Test;
+import static com.twitter.common.testing.easymock.EasyMockTest.createCapture;
+
+import static org.apache.mesos.Protos.FrameworkInfo;
+import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -114,6 +122,14 @@ public class SchedulerIT extends BaseZooKeeperTest {
private static final String STATS_URL_PREFIX = "fake_url";
private static final String FRAMEWORK_ID = "integration_test_framework_id";
+ private static final DriverSettings SETTINGS = new DriverSettings(
+ "fakemaster",
+ Optional.<Protos.Credential>absent(),
+ FrameworkInfo.newBuilder()
+ .setUser("framework user")
+ .setName("test framework")
+ .build());
+
private ExecutorService executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("SchedulerIT-%d").setDaemon(true).build());
private AtomicReference<Optional<RuntimeException>> mainException =
@@ -176,6 +192,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
@Override
protected void configure() {
bind(DriverFactory.class).toInstance(driverFactory);
+ bind(DriverSettings.class).toInstance(SETTINGS);
bind(Log.class).toInstance(log);
bind(ExecutorConfig.class).toInstance(new ExecutorConfig("/executor/thermos"));
install(new BackupModule(backupDir, SnapshotStoreImpl.class));
@@ -297,7 +314,13 @@ public class SchedulerIT extends BaseZooKeeperTest {
@Test
public void testLaunch() throws Exception {
- expect(driverFactory.apply(null)).andReturn(driver).anyTimes();
+ Capture<Scheduler> scheduler = createCapture();
+ expect(driverFactory.create(
+ capture(scheduler),
+ eq(SETTINGS.getCredentials()),
+ eq(SETTINGS.getFrameworkInfo()),
+ eq(SETTINGS.getMasterUri())))
+ .andReturn(driver).anyTimes();
ScheduledTask snapshotTask = makeTask("snapshotTask", ScheduleStatus.ASSIGNED);
ScheduledTask transactionTask = makeTask("transactionTask", ScheduleStatus.RUNNING);
@@ -346,7 +369,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
startScheduler();
driverStarted.await();
- injector.getInstance(Scheduler.class).registered(driver,
+ scheduler.getValue().registered(driver,
FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
MasterInfo.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
index 98718c4..f4214fb 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
@@ -25,16 +25,21 @@ import java.util.logging.Logger;
import javax.inject.Inject;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
import org.apache.aurora.scheduler.app.local.simulator.Events.Started;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.mesos.Protos;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.Filters;
import org.apache.mesos.Protos.FrameworkID;
@@ -53,11 +58,13 @@ import org.apache.mesos.SchedulerDriver;
import static java.util.Objects.requireNonNull;
+import static org.apache.mesos.Protos.FrameworkInfo;
+
/**
* A simulated master for use in scheduler testing.
*/
@SuppressWarnings("deprecation")
-public class FakeMaster implements SchedulerDriver {
+public class FakeMaster implements SchedulerDriver, DriverFactory {
private static final Logger LOG = Logger.getLogger(FakeMaster.class.getName());
@@ -71,12 +78,11 @@ public class FakeMaster implements SchedulerDriver {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final CountDownLatch stopped = new CountDownLatch(1);
- private final Scheduler scheduler;
+ private final SettableFuture<Scheduler> schedulerFuture = SettableFuture.create();
private final EventBus eventBus;
@Inject
- FakeMaster(Scheduler scheduler, EventBus eventBus) {
- this.scheduler = requireNonNull(scheduler);
+ FakeMaster(EventBus eventBus) {
this.eventBus = requireNonNull(eventBus);
}
@@ -95,17 +101,28 @@ public class FakeMaster implements SchedulerDriver {
assertNotStopped();
checkState(activeTasks.containsKey(task), "Task " + task + " does not exist.");
- scheduler.statusUpdate(this, TaskStatus.newBuilder()
+ Futures.getUnchecked(schedulerFuture).statusUpdate(this, TaskStatus.newBuilder()
.setTaskId(task)
.setState(state)
.build());
}
@Override
+ public SchedulerDriver create(
+ Scheduler scheduler,
+ Optional<Protos.Credential> credentials,
+ FrameworkInfo frameworkInfo,
+ String master) {
+
+ schedulerFuture.set(scheduler);
+ return this;
+ }
+
+ @Override
public Status start() {
assertNotStopped();
- scheduler.registered(this,
+ Futures.getUnchecked(schedulerFuture).registered(this,
FrameworkID.newBuilder().setValue("local").build(),
MasterInfo.getDefaultInstance());
@@ -127,7 +144,7 @@ public class FakeMaster implements SchedulerDriver {
if (allOffers.isEmpty()) {
LOG.info("All offers consumed, suppressing offer cycle.");
} else {
- scheduler.resourceOffers(FakeMaster.this, allOffers);
+ Futures.getUnchecked(schedulerFuture).resourceOffers(FakeMaster.this, allOffers);
}
}
},
@@ -182,7 +199,7 @@ public class FakeMaster implements SchedulerDriver {
private void checkState(boolean assertion, String failureMessage) {
if (!assertion) {
- scheduler.error(this, failureMessage);
+ Futures.getUnchecked(schedulerFuture).error(this, failureMessage);
stop();
throw new IllegalStateException(failureMessage);
}
@@ -217,10 +234,12 @@ public class FakeMaster implements SchedulerDriver {
new Runnable() {
@Override
public void run() {
- scheduler.statusUpdate(FakeMaster.this, TaskStatus.newBuilder()
- .setTaskId(task.getTaskId())
- .setState(TaskState.TASK_RUNNING)
- .build());
+ Futures.getUnchecked(schedulerFuture).statusUpdate(
+ FakeMaster.this,
+ TaskStatus.newBuilder()
+ .setTaskId(task.getTaskId())
+ .setState(TaskState.TASK_RUNNING)
+ .build());
}
},
1,
@@ -247,7 +266,7 @@ public class FakeMaster implements SchedulerDriver {
checkState(task != null, "Task " + taskId + " not found.");
idleOffers.put(task.getOffer().getId(), task.getOffer());
- scheduler.statusUpdate(this, TaskStatus.newBuilder()
+ Futures.getUnchecked(schedulerFuture).statusUpdate(this, TaskStatus.newBuilder()
.setTaskId(taskId)
.setState(TaskState.TASK_FINISHED)
.build());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index 43d73b1..640acdf 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -16,10 +16,9 @@ package org.apache.aurora.scheduler.app.local;
import java.io.File;
import java.util.List;
-import javax.annotation.Nullable;
-import javax.inject.Inject;
import javax.inject.Singleton;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.inject.AbstractModule;
@@ -29,13 +28,15 @@ import com.twitter.common.application.AppLauncher;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.DriverFactory;
import org.apache.aurora.scheduler.app.SchedulerMain;
import org.apache.aurora.scheduler.app.local.simulator.ClusterSimulatorModule;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.log.LogStorage;
+import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
/**
@@ -43,6 +44,14 @@ import org.apache.mesos.SchedulerDriver;
*/
public class LocalSchedulerMain extends SchedulerMain {
+ private static final DriverSettings DRIVER_SETTINGS = new DriverSettings(
+ "fakemaster",
+ Optional.<Protos.Credential>absent(),
+ Protos.FrameworkInfo.newBuilder()
+ .setUser("framework user")
+ .setName("test framework")
+ .build());
+
@Override
protected Module getPersistentStorageModule() {
return new AbstractModule() {
@@ -65,28 +74,15 @@ public class LocalSchedulerMain extends SchedulerMain {
return new AbstractModule() {
@Override
protected void configure() {
- bind(DriverFactory.class).to(FakeDriverFactory.class);
+ bind(DriverSettings.class).toInstance(DRIVER_SETTINGS);
bind(SchedulerDriver.class).to(FakeMaster.class);
+ bind(DriverFactory.class).to(FakeMaster.class);
bind(FakeMaster.class).in(Singleton.class);
install(new ClusterSimulatorModule());
}
};
}
- static class FakeDriverFactory implements DriverFactory {
- private final SchedulerDriver driver;
-
- @Inject
- FakeDriverFactory(SchedulerDriver driver) {
- this.driver = driver;
- }
-
- @Override
- public SchedulerDriver apply(@Nullable String input) {
- return driver;
- }
- }
-
public static void main(String[] args) {
File backupDir = Files.createTempDir();
backupDir.deleteOnExit();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index 62d8aab..962aff8 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -29,9 +29,9 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.Clock;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index 059a276..758a8d4 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -35,11 +35,11 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.comm.AdjustRetainedTasks;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.FrameworkID;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
index 8ad9f5c..662ebdc 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
@@ -37,10 +37,10 @@ import com.twitter.common.util.BackoffStrategy;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index 15fb7ff..e2a198a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -30,11 +30,11 @@ import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.async.OfferQueue.LaunchException;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.TaskInfo;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
index 1d84496..51256f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
@@ -18,7 +18,6 @@ import java.util.concurrent.ScheduledFuture;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
-
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.testing.easymock.EasyMockTest;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 1caaf14..7736d4c 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -40,7 +40,6 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -50,6 +49,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index e79327c..17295ac 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
@@ -57,7 +56,7 @@ import static org.junit.Assert.assertEquals;
public class TaskTimeoutTest extends EasyMockTest {
private static final String TASK_ID = "task_id";
- private static final long TIMEOUT_MS = Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS);
+ private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES);
private AtomicLong timedOutTaskCounter;
private ScheduledExecutorService executor;
@@ -81,25 +80,22 @@ public class TaskTimeoutTest extends EasyMockTest {
private void replayAndCreate() {
control.replay();
- timeout = new TaskTimeout(
- executor,
- stateManager,
- Amount.of(TIMEOUT_MS, Time.MILLISECONDS),
- statsProvider);
+ timeout = new TaskTimeout(executor, stateManager, TIMEOUT, statsProvider);
+ timeout.startAsync().awaitRunning();
}
- private Capture<Runnable> expectTaskWatch(long expireMs) {
+ private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) {
Capture<Runnable> capture = createCapture();
executor.schedule(
EasyMock.capture(capture),
- eq(expireMs),
- eq(TimeUnit.MILLISECONDS));
+ eq((long) expireIn.getValue()),
+ eq(expireIn.getUnit().getTimeUnit()));
expectLastCall().andReturn(future);
return capture;
}
private Capture<Runnable> expectTaskWatch() {
- return expectTaskWatch(TIMEOUT_MS);
+ return expectTaskWatch(TIMEOUT);
}
private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
@@ -197,17 +193,17 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testStorageStart() {
- expectTaskWatch(TIMEOUT_MS);
- expectTaskWatch(TIMEOUT_MS);
- expectTaskWatch(TIMEOUT_MS);
+ expectTaskWatch(TIMEOUT);
+ expectTaskWatch(TIMEOUT);
+ expectTaskWatch(TIMEOUT);
replayAndCreate();
- clock.setNowMillis(TIMEOUT_MS * 2);
+ clock.setNowMillis(TIMEOUT.as(Time.MILLISECONDS) * 2);
for (IScheduledTask task : ImmutableList.of(
makeTask("a", ASSIGNED, 0),
- makeTask("b", KILLING, TIMEOUT_MS),
- makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT_MS))) {
+ makeTask("b", KILLING, TIMEOUT.as(Time.MILLISECONDS)),
+ makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT.as(Time.MILLISECONDS)))) {
timeout.recordStateChange(TaskStateChange.initialized(task));
}
@@ -216,4 +212,20 @@ public class TaskTimeoutTest extends EasyMockTest {
changeState("b", KILLING, KILLED);
changeState("c", PREEMPTING, FINISHED);
}
+
+ @Test
+ public void testTimeoutWhileNotStarted() throws Exception {
+ // Since the timeout is never instructed to start, it should not attempt to transition tasks,
+ // but it should try again later.
+ Capture<Runnable> assignedTimeout = expectTaskWatch();
+ expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY);
+
+ control.replay();
+ timeout = new TaskTimeout(executor, stateManager, TIMEOUT, statsProvider);
+
+ changeState(INIT, PENDING);
+ changeState(PENDING, ASSIGNED);
+ assignedTimeout.getValue().run();
+ assertEquals(timedOutTaskCounter.intValue(), 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
new file mode 100644
index 0000000..9e17688
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.base.Throwables;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CommandLineDriverSettingsModuleTest {
+
+ @Test(expected = IllegalStateException.class)
+ public void testMissingPropertiesParsing() {
+ Properties testProperties = new Properties();
+ testProperties.put(CommandLineDriverSettingsModule.PRINCIPAL_KEY, "aurora-scheduler");
+
+ ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
+ try {
+ testProperties.store(propertiesStream, "");
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ CommandLineDriverSettingsModule.parseCredentials(
+ new ByteArrayInputStream(propertiesStream.toByteArray()));
+ }
+
+ @Test
+ public void testPropertiesParsing() {
+ Properties testProperties = new Properties();
+ testProperties.put(CommandLineDriverSettingsModule.PRINCIPAL_KEY, "aurora-scheduler");
+ testProperties.put(CommandLineDriverSettingsModule.SECRET_KEY, "secret");
+
+ ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
+ try {
+ testProperties.store(propertiesStream, "");
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ assertEquals(
+ testProperties,
+ CommandLineDriverSettingsModule.parseCredentials(
+ new ByteArrayInputStream(propertiesStream.toByteArray())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
new file mode 100644
index 0000000..af15c95
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.testing.TearDown;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.base.Command;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.SchedulerDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertTrue;
+
+public class MesosSchedulerImplTest extends EasyMockTest {
+
+ private static final String FRAMEWORK_ID = "framework-id";
+ private static final FrameworkID FRAMEWORK =
+ FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+
+ private static final String SLAVE_HOST = "slave-hostname";
+ private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("slave-id").build();
+ private static final String SLAVE_HOST_2 = "slave-hostname-2";
+ private static final SlaveID SLAVE_ID_2 = SlaveID.newBuilder().setValue("slave-id-2").build();
+ private static final ExecutorID EXECUTOR_ID =
+ ExecutorID.newBuilder().setValue("executor-id").build();
+
+ private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
+ private static final Offer OFFER = Offer.newBuilder()
+ .setFrameworkId(FRAMEWORK)
+ .setSlaveId(SLAVE_ID)
+ .setHostname(SLAVE_HOST)
+ .setId(OFFER_ID)
+ .build();
+ private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
+ private static final Offer OFFER_2 = Offer.newBuilder(OFFER)
+ .setSlaveId(SLAVE_ID_2)
+ .setHostname(SLAVE_HOST_2)
+ .setId(OFFER_ID_2)
+ .build();
+
+ private static final TaskStatus STATUS = TaskStatus.newBuilder()
+ .setState(TaskState.TASK_RUNNING)
+ .setTaskId(TaskID.newBuilder().setValue("task-id").build())
+ .build();
+
+ private StorageTestUtil storageUtil;
+ private TaskLauncher systemLauncher;
+ private TaskLauncher userLauncher;
+ private SchedulerDriver driver;
+ private EventSink eventSink;
+
+ private MesosSchedulerImpl scheduler;
+
+ @Before
+ public void setUp() {
+ storageUtil = new StorageTestUtil(this);
+ final Lifecycle lifecycle =
+ new Lifecycle(createMock(Command.class), createMock(UncaughtExceptionHandler.class));
+ systemLauncher = createMock(TaskLauncher.class);
+ userLauncher = createMock(TaskLauncher.class);
+ eventSink = createMock(EventSink.class);
+
+ Injector injector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(Storage.class).toInstance(storageUtil.storage);
+ bind(Lifecycle.class).toInstance(lifecycle);
+ bind(new TypeLiteral<List<TaskLauncher>>() { })
+ .toInstance(Arrays.asList(systemLauncher, userLauncher));
+ bind(EventSink.class).toInstance(eventSink);
+ bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+ .toInstance(MoreExecutors.sameThreadExecutor());
+ }
+ });
+ scheduler = injector.getInstance(MesosSchedulerImpl.class);
+ driver = createMock(SchedulerDriver.class);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBadOrdering() {
+ control.replay();
+
+ // Should fail since the scheduler is not yet registered.
+ scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
+ }
+
+ @Test
+ public void testNoOffers() throws Exception {
+ new RegisteredFixture() {
+ @Override
+ void test() {
+ scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
+ }
+ }.run();
+ }
+
+ @Test
+ public void testNoAccepts() throws Exception {
+ new OfferFixture() {
+ @Override
+ void respondToOffer() throws Exception {
+ expectOfferAttributesSaved(OFFER);
+ expect(systemLauncher.willUse(OFFER)).andReturn(false);
+ expect(userLauncher.willUse(OFFER)).andReturn(false);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testOfferFirstAccepts() throws Exception {
+ new OfferFixture() {
+ @Override
+ void respondToOffer() throws Exception {
+ expectOfferAttributesSaved(OFFER);
+ expect(systemLauncher.willUse(OFFER)).andReturn(true);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testOfferSchedulerAccepts() throws Exception {
+ new OfferFixture() {
+ @Override
+ void respondToOffer() throws Exception {
+ expectOfferAttributesSaved(OFFER);
+ expect(systemLauncher.willUse(OFFER)).andReturn(false);
+ expect(userLauncher.willUse(OFFER)).andReturn(true);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testStatusUpdateNoAccepts() throws Exception {
+ new StatusFixture() {
+ @Override
+ void expectations() throws Exception {
+ expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
+ expect(userLauncher.statusUpdate(STATUS)).andReturn(false);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testStatusUpdateFirstAccepts() throws Exception {
+ new StatusFixture() {
+ @Override
+ void expectations() throws Exception {
+ expect(systemLauncher.statusUpdate(STATUS)).andReturn(true);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testStatusUpdateSecondAccepts() throws Exception {
+ new StatusFixture() {
+ @Override
+ void expectations() throws Exception {
+ expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
+ expect(userLauncher.statusUpdate(STATUS)).andReturn(true);
+ }
+ }.run();
+ }
+
+ @Test(expected = SchedulerException.class)
+ public void testStatusUpdateFails() throws Exception {
+ new StatusFixture() {
+ @Override
+ void expectations() throws Exception {
+ expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
+ expect(userLauncher.statusUpdate(STATUS)).andThrow(new StorageException("Injected."));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testMultipleOffers() throws Exception {
+ new RegisteredFixture() {
+ @Override
+ void expectations() throws Exception {
+ expectOfferAttributesSaved(OFFER);
+ expectOfferAttributesSaved(OFFER_2);
+ expect(systemLauncher.willUse(OFFER)).andReturn(false);
+ expect(userLauncher.willUse(OFFER)).andReturn(true);
+ expect(systemLauncher.willUse(OFFER_2)).andReturn(false);
+ expect(userLauncher.willUse(OFFER_2)).andReturn(false);
+ }
+
+ @Override
+ void test() {
+ scheduler.resourceOffers(driver, ImmutableList.of(OFFER, OFFER_2));
+ }
+ }.run();
+ }
+
+ @Test
+ public void testDisconnected() throws Exception {
+ new RegisteredFixture() {
+ @Override
+ void expectations() throws Exception {
+ eventSink.post(new DriverDisconnected());
+ }
+
+ @Override
+ void test() {
+ scheduler.disconnected(driver);
+ }
+ }.run();
+ }
+
+ @Test
+ public void testFrameworkMessageIgnored() throws Exception {
+ control.replay();
+
+ scheduler.frameworkMessage(
+ driver,
+ EXECUTOR_ID,
+ SLAVE_ID,
+ "hello".getBytes(StandardCharsets.UTF_8));
+ }
+
+ private void expectOfferAttributesSaved(Offer offer) {
+ storageUtil.attributeStore.saveHostAttributes(Conversions.getAttributes(offer));
+ }
+
+ private abstract class RegisteredFixture {
+ private final AtomicBoolean runCalled = new AtomicBoolean(false);
+
+ RegisteredFixture() throws Exception {
+ // Prevent otherwise silent noop tests that forget to call run().
+ addTearDown(new TearDown() {
+ @Override
+ public void tearDown() {
+ assertTrue(runCalled.get());
+ }
+ });
+ }
+
+ void run() throws Exception {
+ runCalled.set(true);
+ eventSink.post(new DriverRegistered());
+ storageUtil.expectOperations();
+ storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
+ expectations();
+
+ control.replay();
+
+ scheduler.registered(driver, FRAMEWORK, MasterInfo.getDefaultInstance());
+ test();
+ }
+
+ void expectations() throws Exception {
+ // Default no-op, subclasses may override.
+ }
+
+ abstract void test();
+ }
+
+ private abstract class OfferFixture extends RegisteredFixture {
+ OfferFixture() throws Exception {
+ super();
+ }
+
+ abstract void respondToOffer() throws Exception;
+
+ @Override
+ void expectations() throws Exception {
+ respondToOffer();
+ }
+
+ @Override
+ void test() {
+ scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
+ }
+ }
+
+ private abstract class StatusFixture extends RegisteredFixture {
+ StatusFixture() throws Exception {
+ super();
+ }
+
+ @Override
+ void test() {
+ scheduler.statusUpdate(driver, STATUS);
+ }
+ }
+}