You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2017/04/06 08:54:29 UTC
aurora git commit: Reliably subscribe to Mesos in the HTTP Driver.
Repository: aurora
Updated Branches:
refs/heads/master 7678d194f -> 656cf9ac5
Reliably subscribe to Mesos in the HTTP Driver.
As noted in AURORA-1911 the `V1Mesos` driver doesn't re try `SUBSCRIBE` calls if
they fail. This means that after a leader subscribes and disconnects, it is
possible for it to never re subscribe again if the Mesos Master is unhealthy.
To fix this, I have moved the subscription into the dedicated
`SchedulerExecutor` and it coninutes to attempt to subscribe using truncated
binary backoff. It only stops if we are disconnected or if we sucessfully
connect.
Bugs closed: AURORA-1911
Reviewed at https://reviews.apache.org/r/58053/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/656cf9ac
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/656cf9ac
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/656cf9ac
Branch: refs/heads/master
Commit: 656cf9ac59754cb25084821b74ca22ef0d0ff2d5
Parents: 7678d19
Author: Zameer Manji <zm...@apache.org>
Authored: Thu Apr 6 10:40:41 2017 +0200
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Apr 6 10:40:41 2017 +0200
----------------------------------------------------------------------
.../aurora/benchmark/StatusUpdateBenchmark.java | 3 +-
.../scheduler/mesos/MesosCallbackHandler.java | 17 +--
.../scheduler/mesos/SchedulerDriverModule.java | 19 ++-
.../mesos/VersionedMesosSchedulerImpl.java | 57 +++++--
.../mesos/VersionedMesosSchedulerImplTest.java | 148 ++++++++++++++++++-
5 files changed, 211 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index 206b114..c81387f 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -66,6 +66,7 @@ import org.apache.aurora.scheduler.mesos.MesosCallbackHandler;
import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
import org.apache.aurora.scheduler.mesos.ProtosConversion;
+import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
@@ -189,7 +190,7 @@ public class StatusUpdateBenchmark {
bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
bind(Executor.class)
- .annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
+ .annotatedWith(SchedulerDriverModule.SchedulerExecutor.class)
.toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor(
"SchedulerImpl-%d",
LoggerFactory.getLogger(StatusUpdateBenchmark.class)));
http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index 5bf1e4e..5a5281a 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.mesos;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
@@ -22,7 +20,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
-import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -57,10 +54,6 @@ import org.apache.mesos.v1.Protos.TaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
import static org.apache.mesos.v1.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
@@ -106,14 +99,6 @@ public interface MesosCallbackHandler {
private final AtomicBoolean frameworkRegistered;
/**
- * Binding annotation for the executor the incoming Mesos message handler uses.
- */
- @VisibleForTesting
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface SchedulerExecutor { }
-
- /**
* Creates a new handler for callbacks.
*
* @param storage Store to save host attributes into.
@@ -130,7 +115,7 @@ public interface MesosCallbackHandler {
TaskStatusHandler taskStatusHandler,
OfferManager offerManager,
EventSink eventSink,
- @SchedulerExecutor Executor executor,
+ @SchedulerDriverModule.SchedulerExecutor Executor executor,
StatsProvider statsProvider,
Driver driver,
Clock clock,
http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index 10d4f1b..18dc3e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -13,10 +13,14 @@
*/
package org.apache.aurora.scheduler.mesos;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
import java.util.concurrent.Executor;
+import javax.inject.Qualifier;
import javax.inject.Singleton;
+import com.google.common.annotations.VisibleForTesting;
import com.google.inject.AbstractModule;
import org.apache.aurora.scheduler.app.SchedulerMain;
@@ -27,6 +31,11 @@ import org.apache.mesos.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
import static com.google.common.base.Preconditions.checkState;
/**
@@ -40,6 +49,14 @@ public class SchedulerDriverModule extends AbstractModule {
this.kind = kind;
}
+ /**
+ * Binding annotation for the executor the incoming Mesos message handler uses.
+ */
+ @VisibleForTesting
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface SchedulerExecutor { }
+
@Override
protected void configure() {
bind(Scheduler.class).to(MesosSchedulerImpl.class);
@@ -48,7 +65,7 @@ public class SchedulerDriverModule extends AbstractModule {
bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
// TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
- bind(Executor.class).annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
+ bind(Executor.class).annotatedWith(SchedulerExecutor.class)
.toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
switch (kind) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
index 67d356a..5329de5 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
@@ -13,6 +13,9 @@
*/
package org.apache.aurora.scheduler.mesos;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import com.google.common.base.Optional;
@@ -21,6 +24,8 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.aurora.common.inject.TimedInterceptor;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffHelper;
import org.apache.aurora.scheduler.stats.CachedCounters;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.mesos.v1.Protos;
@@ -45,8 +50,13 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
private final MesosCallbackHandler handler;
private final Storage storage;
private final FrameworkInfoFactory infoFactory;
+ private final Executor executor;
+ private final BackoffHelper backoffHelper;
- private volatile boolean isRegistered = false;
+ private final AtomicBoolean isSubscribed = new AtomicBoolean(false);
+ private final AtomicBoolean isConnected = new AtomicBoolean(false);
+ private final AtomicBoolean isRegistered = new AtomicBoolean(false);
+ private final AtomicLong subcriptionCalls;
private static final String EVENT_COUNTER_STAT_PREFIX = "mesos_scheduler_event_";
// A cache to hold the metric names to prevent us from creating strings for every event
@@ -64,18 +74,26 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
VersionedMesosSchedulerImpl(
MesosCallbackHandler handler,
CachedCounters counters,
+ StatsProvider statsProvider,
Storage storage,
+ @SchedulerDriverModule.SchedulerExecutor Executor executor,
+ BackoffHelper backoffHelper,
FrameworkInfoFactory factory) {
this.handler = requireNonNull(handler);
this.counters = requireNonNull(counters);
this.storage = requireNonNull(storage);
this.infoFactory = requireNonNull(factory);
+ this.executor = requireNonNull(executor);
+ this.backoffHelper = requireNonNull(backoffHelper);
initializeEventMetrics();
+
+ this.subcriptionCalls = statsProvider.makeCounter("mesos_scheduler_subscription_attempts");
}
@Override
public void connected(Mesos mesos) {
LOG.info("Connected to Mesos master.");
+ isConnected.set(true);
Optional<String> frameworkId = storage.read(
storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
@@ -95,15 +113,35 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
}
- LOG.info("Sending subscribe call");
- mesos.send(call.setSubscribe(Call.Subscribe.newBuilder()
- .setFrameworkInfo(frameworkBuilder.build())
- .build())
- .build());
+ call.setSubscribe(Call.Subscribe.newBuilder().setFrameworkInfo(frameworkBuilder));
+
+ executor.execute(() -> {
+ LOG.info("Starting to subscribe to Mesos with backoff.");
+ try {
+ backoffHelper.doUntilSuccess(() -> {
+ if (!isConnected.get()) {
+ LOG.info("Disconnected while attempting to subscribe. Stopping attempt.");
+ return true;
+ }
+ if (!isSubscribed.get()) {
+ LOG.info("Sending subscribe call.");
+ mesos.send(call.build());
+ subcriptionCalls.incrementAndGet();
+ return false;
+ }
+ LOG.info("Subscribed to Mesos");
+ return true;
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
@Override
public void disconnected(Mesos mesos) {
+ isSubscribed.set(false);
+ isConnected.set(false);
handler.handleDisconnection();
}
@@ -126,16 +164,17 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
switch(event.getType()) {
case SUBSCRIBED:
Event.Subscribed subscribed = event.getSubscribed();
- if (isRegistered) {
+ if (isRegistered.get()) {
handler.handleReregistration(subscribed.getMasterInfo());
} else {
+ isRegistered.set(true);
handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo());
- isRegistered = true;
}
+ isSubscribed.set(true);
break;
case OFFERS:
- checkState(isRegistered, "Must be registered before receiving offers.");
+ checkState(isSubscribed.get(), "Must be registered before receiving offers.");
handler.handleOffers(event.getOffers().getOffersList());
break;
http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
index 756d0d9..8c259bf 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
@@ -13,12 +13,16 @@
*/
package org.apache.aurora.scheduler.mesos;
+import java.util.concurrent.Executors;
+
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString;
+import org.apache.aurora.common.base.ExceptionalSupplier;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffHelper;
import org.apache.aurora.scheduler.stats.CachedCounters;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -40,10 +44,12 @@ import org.easymock.Capture;
import org.junit.Before;
import org.junit.Test;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class VersionedMesosSchedulerImplTest extends EasyMockTest {
@@ -53,6 +59,7 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
private Mesos driver;
private FakeStatsProvider statsProvider;
private FrameworkInfoFactory infoFactory;
+ private BackoffHelper backoffHelper;
private VersionedMesosSchedulerImpl scheduler;
@@ -145,31 +152,44 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
driver = createMock(Mesos.class);
statsProvider = new FakeStatsProvider();
infoFactory = createMock(FrameworkInfoFactory.class);
+ backoffHelper = createMock(BackoffHelper.class);
scheduler = new VersionedMesosSchedulerImpl(
handler,
new CachedCounters(statsProvider),
+ statsProvider,
storageUtil.storage,
+ Executors.newSingleThreadExecutor(),
+ backoffHelper,
infoFactory);
}
- @Test
- public void testConnected() {
+ @Test(timeout = 300000)
+ public void testConnected() throws Exception {
// Once the V1 driver has connected, we need to establish a subscription to get events
-
- storageUtil.expectOperations();
- expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
- expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+ expectFrameworkInfoRead();
Capture<Call> subscribeCapture = createCapture();
driver.send(capture(subscribeCapture));
expectLastCall().once();
+ Capture<ExceptionalSupplier<Boolean, RuntimeException>> supplierCapture = createCapture();
+ backoffHelper.doUntilSuccess(capture(supplierCapture));
+ expectLastCall().once();
+
control.replay();
scheduler.connected(driver);
+ waitUntilCaptured(supplierCapture);
+
+ assertTrue(supplierCapture.hasCaptured());
+ ExceptionalSupplier<Boolean, RuntimeException> supplier = supplierCapture.getValue();
+
+ // Make one connection attempt
+ supplier.get();
+
assertTrue(subscribeCapture.hasCaptured());
Call subscribe = subscribeCapture.getValue();
@@ -181,6 +201,78 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build());
}
+ @Test(timeout = 300000)
+ public void testAttemptSubscriptionSuccessful() throws Exception {
+ expectFrameworkInfoRead();
+
+ // Other tests already check if what we send is correct.
+ driver.send(anyObject());
+ expectLastCall().once();
+ driver.send(anyObject());
+ expectLastCall().once();
+
+ Capture<ExceptionalSupplier<Boolean, RuntimeException>> supplierCapture = createCapture();
+ backoffHelper.doUntilSuccess(capture(supplierCapture));
+ expectLastCall().once();
+
+ handler.handleRegistration(FRAMEWORK, MASTER);
+
+ control.replay();
+
+ scheduler.connected(driver);
+
+ waitUntilCaptured(supplierCapture);
+ assertTrue(supplierCapture.hasCaptured());
+ ExceptionalSupplier<Boolean, RuntimeException> supplier = supplierCapture.getValue();
+
+ // Each attempt should return false.
+ assertFalse(supplier.get());
+ assertFalse(supplier.get());
+
+ // After the callback we should return true because it was successful.
+ scheduler.received(driver, SUBSCRIBED_EVENT);
+
+ assertTrue(supplier.get());
+ }
+
+ @Test(timeout = 300000)
+ public void testAttemptSubscriptionHaltsAfterDisconnection() throws Exception {
+ storageUtil.expectOperations();
+ expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+ expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+
+ // Other tests already check if what we send is correct.
+ driver.send(anyObject());
+ expectLastCall().once();
+
+ Capture<ExceptionalSupplier<Boolean, RuntimeException>> supplierCapture = createCapture();
+ backoffHelper.doUntilSuccess(capture(supplierCapture));
+ expectLastCall().once();
+
+ handler.handleDisconnection();
+
+ control.replay();
+
+ scheduler.connected(driver);
+
+ waitUntilCaptured(supplierCapture);
+ assertTrue(supplierCapture.hasCaptured());
+ ExceptionalSupplier<Boolean, RuntimeException> supplier = supplierCapture.getValue();
+
+ assertFalse(supplier.get());
+
+ // After disconnection we should stop.
+ scheduler.disconnected(driver);
+
+ assertTrue(supplier.get());
+ }
+
+ private static void waitUntilCaptured(Capture<?> capture) throws Exception {
+ while (!capture.hasCaptured()) {
+ Thread.sleep(1000);
+ }
+ }
+
@Test
public void testDisconnected() {
handler.handleDisconnection();
@@ -272,4 +364,48 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
scheduler.received(driver, FAILED_AGENT_EVENT);
assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_FAILURE"));
}
+
+ @Test(timeout = 300000)
+ public void testSubscribeDisconnectSubscribeCycle() throws Exception {
+ expectFrameworkInfoRead();
+
+ Capture<ExceptionalSupplier<Boolean, RuntimeException>> firstSubscribeAttempt = createCapture();
+ backoffHelper.doUntilSuccess(capture(firstSubscribeAttempt));
+ expectLastCall().once();
+
+ handler.handleRegistration(FRAMEWORK, MASTER);
+ handler.handleDisconnection();
+
+ Capture<ExceptionalSupplier<Boolean, RuntimeException>> secondSubscribeAttempt =
+ createCapture();
+ backoffHelper.doUntilSuccess(capture(secondSubscribeAttempt));
+ expectLastCall().once();
+
+ // Second subscribe should call the reregistration handler.
+ handler.handleReregistration(MASTER);
+
+ control.replay();
+
+ scheduler.connected(driver);
+
+ waitUntilCaptured(firstSubscribeAttempt);
+ assertTrue(firstSubscribeAttempt.hasCaptured());
+
+ scheduler.received(driver, SUBSCRIBED_EVENT);
+ scheduler.disconnected(driver);
+ scheduler.connected(driver);
+
+ waitUntilCaptured(secondSubscribeAttempt);
+ assertTrue(secondSubscribeAttempt.hasCaptured());
+
+ scheduler.received(driver, SUBSCRIBED_EVENT);
+ }
+
+ private void expectFrameworkInfoRead() {
+ storageUtil.expectOperations();
+ expect(storageUtil.schedulerStore.fetchFrameworkId())
+ .andReturn(Optional.of(FRAMEWORK_ID))
+ .anyTimes();
+ expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO).anyTimes();
+ }
}